Skip to content

Commit

Permalink
Remove OpenSearch specific id mapping from model classes
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Jun 3, 2024
1 parent 328e198 commit 9bfe5ef
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;
Expand All @@ -12,42 +10,46 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;
import org.opensearch.sql.spark.utils.IDUtils;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
/** OpenSearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
public class OpenSearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

private final StateStore stateStore;
private final AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);
LogManager.getLogger(OpenSearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
stateStore.create(
mapIdToDocumentId(asyncQueryJobMetadata.getId()),
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(asyncQueryJobMetadata.getDatasourceName()));
}

private String mapIdToDocumentId(String id) {
return "qid" + id;
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
public Optional<AsyncQueryJobMetadata> getJobMetadata(String queryId) {
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return stateStore.get(
queryId.docId(),
mapIdToDocumentId(queryId),
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(IDUtils.decode(queryId)));
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", queryId));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;
Expand Down Expand Up @@ -59,6 +57,6 @@ public static AsyncQueryJobMetadata copy(

@Override
public String getId() {
return "qid" + queryId;
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.utils.IDUtils;

/** Generates QueryId by embedding Datasource name and random UUID */
public class DatasourceEmbeddedQueryIdProvider implements QueryIdProvider {

@Override
public String getQueryId(DispatchQueryRequest dispatchQueryRequest) {
return AsyncQueryId.newAsyncQueryId(dispatchQueryRequest.getDatasource()).getId();
return IDUtils.encode(dispatchQueryRequest.getDatasource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
public static final String DOC_ID_PREFIX = "index";

private final String queryId;
private final String status;
private final String error;
Expand All @@ -39,6 +37,6 @@ public static IndexDMLResult copy(IndexDMLResult copy, ImmutableMap<String, Obje

@Override
public String getId() {
return DOC_ID_PREFIX + queryId;
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class OpenSearchSessionStorageService implements SessionStorageService {
@Override
public SessionModel createSession(SessionModel sessionModel) {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class OpenSearchStatementStorageService implements StatementStorageServic
@Override
public StatementModel createStatement(StatementModel statementModel) {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -78,15 +77,16 @@ public class StateStore {
private final ClusterService clusterService;

@VisibleForTesting
public <T extends StateModel> T create(T st, CopyBuilder<T> builder, String indexName) {
public <T extends StateModel> T create(
String docId, T st, CopyBuilder<T> builder, String indexName) {
try {
if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
createIndex(indexName);
}
XContentSerializer<T> serializer = getXContentSerializer(st);
IndexRequest indexRequest =
new IndexRequest(indexName)
.id(st.getId())
.id(docId)
.source(serializer.toXContent(st, ToXContent.EMPTY_PARAMS))
.setIfSeqNo(getSeqNo(st))
.setIfPrimaryTerm(getPrimaryTerm(st))
Expand Down Expand Up @@ -268,26 +268,6 @@ private String loadConfigFromResource(String fileName) throws IOException {
return IOUtils.toString(fileStream, StandardCharsets.UTF_8);
}

public static Function<AsyncQueryJobMetadata, AsyncQueryJobMetadata> createJobMetaData(
StateStore stateStore, String datasourceName) {
return (jobMetadata) ->
stateStore.create(
jobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

public static Function<String, Optional<AsyncQueryJobMetadata>> getJobMetaData(
StateStore stateStore, String datasourceName) {
AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer =
new AsyncQueryJobMetadataXContentSerializer();
return (docId) ->
stateStore.get(
docId,
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

public static Supplier<Long> activeSessionsCount(StateStore stateStore, String datasourceName) {
return () ->
stateStore.count(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String
public FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel) {
return stateStore.create(
flintIndexStateModel.getId(),
flintIndexStateModel,
FlintIndexStateModel::copy,
OpenSearchStateStoreUtil.getIndexName(flintIndexStateModel.getDatasourceName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public class OpenSearchIndexDMLResultStorageService implements IndexDMLResultSto
public IndexDMLResult createIndexDMLResult(IndexDMLResult result) {
DataSourceMetadata dataSourceMetadata =
dataSourceService.getDataSourceMetadata(result.getDatasourceName());
return stateStore.create(result, IndexDMLResult::copy, dataSourceMetadata.getResultIndex());
return stateStore.create(
mapIdToDocumentId(result.getId()),
result,
IndexDMLResult::copy,
dataSourceMetadata.getResultIndex());
}

private String mapIdToDocumentId(String id) {
return "index" + id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpenSearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.client.EMRServerlessClientFactoryImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
Expand Down Expand Up @@ -69,7 +69,7 @@ public AsyncQueryExecutorService asyncQueryExecutorService(
@Provides
public AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService(
StateStore stateStore, AsyncQueryJobMetadataXContentSerializer serializer) {
return new OpensearchAsyncQueryJobMetadataStorageService(stateStore, serializer);
return new OpenSearchAsyncQueryJobMetadataStorageService(stateStore, serializer);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
JobExecutionResponseReader jobExecutionResponseReader) {
StateStore stateStore = new StateStore(client, clusterService);
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(
new OpenSearchAsyncQueryJobMetadataStorageService(
stateStore, new AsyncQueryJobMetadataXContentSerializer());
QueryHandlerFactory queryHandlerFactory =
new QueryHandlerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;
import org.opensearch.sql.spark.utils.IDUtils;
import org.opensearch.test.OpenSearchIntegTestCase;

public class OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest
public class OpenSearchAsyncQueryJobMetadataStorageServiceTest
extends OpenSearchIntegTestCase {

public static final String DS_NAME = "mys3";
private static final String MOCK_SESSION_ID = "sessionId";
private static final String MOCK_RESULT_INDEX = "resultIndex";
private static final String MOCK_QUERY_ID = "00fdo6u94n7abo0q";
private OpensearchAsyncQueryJobMetadataStorageService opensearchJobMetadataStorageService;
private OpenSearchAsyncQueryJobMetadataStorageService openSearchJobMetadataStorageService;

@Before
public void setup() {
opensearchJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(
openSearchJobMetadataStorageService =
new OpenSearchAsyncQueryJobMetadataStorageService(
new StateStore(client(), clusterService()),
new AsyncQueryJobMetadataXContentSerializer());
}
Expand All @@ -40,16 +40,16 @@ public void setup() {
public void testStoreJobMetadata() {
AsyncQueryJobMetadata expected =
AsyncQueryJobMetadata.builder()
.queryId(AsyncQueryId.newAsyncQueryId(DS_NAME).getId())
.queryId(IDUtils.encode(DS_NAME))
.jobId(EMR_JOB_ID)
.applicationId(EMRS_APPLICATION_ID)
.resultIndex(MOCK_RESULT_INDEX)
.datasourceName(DS_NAME)
.build();

opensearchJobMetadataStorageService.storeJobMetadata(expected);
openSearchJobMetadataStorageService.storeJobMetadata(expected);
Optional<AsyncQueryJobMetadata> actual =
opensearchJobMetadataStorageService.getJobMetadata(expected.getQueryId());
openSearchJobMetadataStorageService.getJobMetadata(expected.getQueryId());

assertTrue(actual.isPresent());
assertEquals(expected, actual.get());
Expand All @@ -61,17 +61,17 @@ public void testStoreJobMetadata() {
public void testStoreJobMetadataWithResultExtraData() {
AsyncQueryJobMetadata expected =
AsyncQueryJobMetadata.builder()
.queryId(AsyncQueryId.newAsyncQueryId(DS_NAME).getId())
.queryId(IDUtils.encode(DS_NAME))
.jobId(EMR_JOB_ID)
.applicationId(EMRS_APPLICATION_ID)
.resultIndex(MOCK_RESULT_INDEX)
.sessionId(MOCK_SESSION_ID)
.datasourceName(DS_NAME)
.build();

opensearchJobMetadataStorageService.storeJobMetadata(expected);
openSearchJobMetadataStorageService.storeJobMetadata(expected);
Optional<AsyncQueryJobMetadata> actual =
opensearchJobMetadataStorageService.getJobMetadata(expected.getQueryId());
openSearchJobMetadataStorageService.getJobMetadata(expected.getQueryId());

assertTrue(actual.isPresent());
assertEquals(expected, actual.get());
Expand All @@ -84,7 +84,7 @@ public void testGetJobMetadataWithMalformedQueryId() {
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> opensearchJobMetadataStorageService.getJobMetadata(MOCK_QUERY_ID));
() -> openSearchJobMetadataStorageService.getJobMetadata(MOCK_QUERY_ID));
Assertions.assertEquals(
String.format("Invalid QueryId: %s", MOCK_QUERY_ID),
asyncQueryNotFoundException.getMessage());
Expand All @@ -95,7 +95,7 @@ public void testGetJobMetadataWithEmptyQueryId() {
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> opensearchJobMetadataStorageService.getJobMetadata(""));
() -> openSearchJobMetadataStorageService.getJobMetadata(""));
Assertions.assertEquals("Invalid QueryId: ", asyncQueryNotFoundException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void getFlintIndexStateModel() {

@Test
void createFlintIndexStateModel() {
when(mockStateStore.create(any(), any(), any())).thenReturn(responseFlintIndexStateModel);
when(mockStateStore.create(any(), any(), any(), any()))
.thenReturn(responseFlintIndexStateModel);
when(flintIndexStateModel.getDatasourceName()).thenReturn(DATASOURCE);

FlintIndexStateModel result =
Expand Down

0 comments on commit 9bfe5ef

Please sign in to comment.