diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionRequest.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionRequest.java new file mode 100644 index 0000000000..0756edd129 --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionRequest.java @@ -0,0 +1,37 @@ +package org.opensearch.sql.datasources.model.transport; + +import lombok.Getter; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +import java.io.IOException; +import java.util.Map; + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; +import static org.opensearch.sql.datasources.utils.XContentParserUtils.CONNECTOR_FIELD; +import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD; + +public class DataSourceActionRequest extends ActionRequest { + + public DataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public DataSourceActionRequest() { + } + +// public DataSourceMetadata getSourceMetadata() { +// return null; +// } +// +// public Map getDataSourceData() { +// return Map.of(); +// } +} diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionResponse.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionResponse.java new file mode 100644 index 0000000000..0562782095 --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/DataSourceActionResponse.java @@ -0,0 +1,25 @@ +package org.opensearch.sql.datasources.model.transport; + +import lombok.Getter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class DataSourceActionResponse extends ActionResponse { + @Getter + String result; + + public DataSourceActionResponse() { + } + + public DataSourceActionResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + + } +} \ No newline at end of file diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java index 9443ea561e..4953c23f2a 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java @@ -18,7 +18,7 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.core.common.io.stream.StreamInput; -public class PatchDataSourceActionRequest extends ActionRequest { +public class PatchDataSourceActionRequest extends DataSourceActionRequest { @Getter private Map dataSourceData; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java index 18873a6731..e087a23547 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java @@ -15,7 +15,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; @RequiredArgsConstructor -public class PatchDataSourceActionResponse extends ActionResponse { +public class PatchDataSourceActionResponse extends DataSourceActionResponse { @Getter private final String result; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionRequest.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionRequest.java index b502f348e2..7c2d32f9c6 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionRequest.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionRequest.java @@ -16,7 +16,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.sql.datasource.model.DataSourceMetadata; -public class UpdateDataSourceActionRequest extends ActionRequest { +public class UpdateDataSourceActionRequest extends DataSourceActionRequest { @Getter private DataSourceMetadata dataSourceMetadata; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionResponse.java b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionResponse.java index 0be992d067..6029a856f5 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionResponse.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/model/transport/UpdateDataSourceActionResponse.java @@ -15,7 +15,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; @RequiredArgsConstructor -public class UpdateDataSourceActionResponse extends ActionResponse { +public class UpdateDataSourceActionResponse extends DataSourceActionResponse { @Getter private final String result; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index c207f55738..f6a3a98c31 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -197,12 +197,12 @@ private RestChannelConsumer executeUpdateRequest(RestRequest restRequest, NodeCl nodeClient, () -> nodeClient.execute( - TransportUpdateDataSourceAction.ACTION_TYPE, + TransportDataSourceAction.ACTION_TYPE, new UpdateDataSourceActionRequest(dataSourceMetadata), new ActionListener<>() { @Override public void onResponse( - UpdateDataSourceActionResponse updateDataSourceActionResponse) { + DataSourceActionResponse updateDataSourceActionResponse) { restChannel.sendResponse( new BytesRestResponse( RestStatus.OK, @@ -210,7 +210,7 @@ public void onResponse( updateDataSourceActionResponse.getResult())); } - @Override + @Override public void onFailure(Exception e) { handleException(e, restChannel); } @@ -224,13 +224,14 @@ private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeCli Scheduler.schedule( nodeClient, () -> + { nodeClient.execute( - TransportPatchDataSourceAction.ACTION_TYPE, + TransportDataSourceAction.ACTION_TYPE, new PatchDataSourceActionRequest(dataSourceData), new ActionListener<>() { @Override public void onResponse( - PatchDataSourceActionResponse patchDataSourceActionResponse) { + DataSourceActionResponse patchDataSourceActionResponse) { restChannel.sendResponse( new BytesRestResponse( RestStatus.OK, @@ -242,7 +243,8 @@ public void onResponse( public void onFailure(Exception e) { handleException(e, restChannel); } - })); + }); + }); } private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDataSourceAction.java new file mode 100644 index 0000000000..9efd2940cd --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDataSourceAction.java @@ -0,0 +1,90 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.datasources.transport; + +import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasources.model.transport.*; +import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportDataSourceAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/datasources/update"; + public static final ActionType ACTION_TYPE = + new ActionType<>(NAME, DataSourceActionResponse::new); + + private DataSourceService dataSourceService; + + /** + * TransportUpdateDataSourceAction action for updating datasource. + * + * @param transportService transportService. + * @param actionFilters actionFilters. + * @param dataSourceService dataSourceService. + */ + @Inject + public TransportDataSourceAction( + TransportService transportService, + ActionFilters actionFilters, + DataSourceServiceImpl dataSourceService) { + super( + TransportDataSourceAction.NAME, + transportService, + actionFilters, + DataSourceActionRequest::new); + this.dataSourceService = dataSourceService; + } + + @Override + protected void doExecute( + Task task, + DataSourceActionRequest request, + ActionListener actionListener) { + try { + if (request instanceof UpdateDataSourceActionRequest) { + UpdateDataSourceActionRequest request1 = (UpdateDataSourceActionRequest) request; + dataSourceService.updateDataSource(request1.getDataSourceMetadata()); + String responseContent = + new JsonResponseFormatter(PRETTY) { + @Override + protected Object buildJsonObject(String response) { + return response; + } + }.format("Updated DataSource with name " + request1.getDataSourceMetadata().getName()); + actionListener.onResponse(new UpdateDataSourceActionResponse(responseContent)); + } else if (request instanceof PatchDataSourceActionRequest) { + PatchDataSourceActionRequest request2 = (PatchDataSourceActionRequest) request; + dataSourceService.patchDataSource(request2.getDataSourceData()); + String responseContent = + new JsonResponseFormatter(PRETTY) { + @Override + protected Object buildJsonObject(String response) { + return response; + } + }.format("Updated DataSource with name " + request2.getDataSourceData().get(NAME_FIELD)); + actionListener.onResponse(new PatchDataSourceActionResponse(responseContent)); + } else { + throw new IllegalArgumentException("Unexpected request type"); + } +// actionListener.onResponse(new UpdateDataSourceActionResponse(responseContent)); + } catch (Exception e) { + actionListener.onFailure(e); + } + } +} diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java deleted file mode 100644 index fefd0f3a01..0000000000 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.datasources.transport; - -import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; - -import org.opensearch.action.ActionType; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.common.inject.Inject; -import org.opensearch.core.action.ActionListener; -import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; -import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; -import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; -import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportService; - -public class TransportUpdateDataSourceAction - extends HandledTransportAction { - - public static final String NAME = "cluster:admin/opensearch/ql/datasources/update"; - public static final ActionType ACTION_TYPE = - new ActionType<>(NAME, UpdateDataSourceActionResponse::new); - - private DataSourceService dataSourceService; - - /** - * TransportUpdateDataSourceAction action for updating datasource. - * - * @param transportService transportService. - * @param actionFilters actionFilters. - * @param dataSourceService dataSourceService. - */ - @Inject - public TransportUpdateDataSourceAction( - TransportService transportService, - ActionFilters actionFilters, - DataSourceServiceImpl dataSourceService) { - super( - TransportUpdateDataSourceAction.NAME, - transportService, - actionFilters, - UpdateDataSourceActionRequest::new); - this.dataSourceService = dataSourceService; - } - - @Override - protected void doExecute( - Task task, - UpdateDataSourceActionRequest request, - ActionListener actionListener) { - try { - dataSourceService.updateDataSource(request.getDataSourceMetadata()); - String responseContent = - new JsonResponseFormatter(PRETTY) { - @Override - protected Object buildJsonObject(String response) { - return response; - } - }.format("Updated DataSource with name " + request.getDataSourceMetadata().getName()); - actionListener.onResponse(new UpdateDataSourceActionResponse(responseContent)); - } catch (Exception e) { - actionListener.onFailure(e); - } - } -} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDataSourceActionTest.java similarity index 92% rename from datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java rename to datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDataSourceActionTest.java index ffcd526f87..c0e287ee44 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportDataSourceActionTest.java @@ -18,6 +18,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.model.transport.DataSourceActionResponse; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; @@ -25,13 +26,13 @@ import org.opensearch.transport.TransportService; @ExtendWith(MockitoExtension.class) -public class TransportUpdateDataSourceActionTest { +public class TransportDataSourceActionTest { @Mock private TransportService transportService; - @Mock private TransportUpdateDataSourceAction action; + @Mock private TransportDataSourceAction action; @Mock private DataSourceServiceImpl dataSourceService; @Mock private Task task; - @Mock private ActionListener actionListener; + @Mock private ActionListener actionListener; @Captor private ArgumentCaptor @@ -42,7 +43,7 @@ public class TransportUpdateDataSourceActionTest { @BeforeEach public void setUp() { action = - new TransportUpdateDataSourceAction( + new TransportDataSourceAction( transportService, new ActionFilters(new HashSet<>()), dataSourceService); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 7b49e04a41..94b3718989 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -170,12 +170,12 @@ public List getRestHandlers( TransportGetDataSourceAction.class), new ActionHandler<>( new ActionType<>( - TransportUpdateDataSourceAction.NAME, UpdateDataSourceActionResponse::new), - TransportUpdateDataSourceAction.class), - new ActionHandler<>( - new ActionType<>( - TransportPatchDataSourceAction.NAME, PatchDataSourceActionResponse::new), - TransportPatchDataSourceAction.class), + TransportDataSourceAction.NAME, DataSourceActionResponse::new), + TransportDataSourceAction.class), +// new ActionHandler<>( +// new ActionType<>( +// TransportPatchDataSourceAction.NAME, PatchDataSourceActionResponse::new), +// TransportPatchDataSourceAction.class), new ActionHandler<>( new ActionType<>( TransportDeleteDataSourceAction.NAME, DeleteDataSourceActionResponse::new),