Skip to content

Commit

Permalink
[Backport 2.x] deprecated job-metadata-index (#2340)
Browse files Browse the repository at this point in the history
* deprecated job-metadata-index (#2339)

* deprecate job-metadata-index

Signed-off-by: Peng Huo <[email protected]>

* upgrade log4j2

Signed-off-by: Peng Huo <[email protected]>

* update codestyle

Signed-off-by: Peng Huo <[email protected]>

* upgrade log4j

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
(cherry picked from commit b30d3c9)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* revert log4j upgrade

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Peng Huo <[email protected]>
(cherry picked from commit f3fdead)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and penghuo committed Oct 23, 2023
1 parent 58a5ae5 commit 0f09aec
Show file tree
Hide file tree
Showing 25 changed files with 536 additions and 630 deletions.
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ configurations.all {
resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.13"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
}
}
6 changes: 3 additions & 3 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ private DataSourceServiceImpl createDataSourceService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService(
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier,
SparkExecutionEngineConfig sparkExecutionEngineConfig) {
StateStore stateStore = new StateStore(client, clusterService);
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
new OpensearchAsyncQueryJobMetadataStorageService(stateStore);
EMRServerlessClient emrServerlessClient =
createEMRServerlessClient(sparkExecutionEngineConfig.getRegion());
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
Expand All @@ -319,8 +320,7 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(
new StateStore(client, clusterService), emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getJobId(), dispatchQueryResponse.getSessionId());
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,166 +7,31 @@

package org.opensearch.sql.spark.asyncquery;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createJobMetaData;

import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.StateStore;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

public static final String JOB_METADATA_INDEX = ".ql-job-metadata";
private static final String JOB_METADATA_INDEX_MAPPING_FILE_NAME =
"job-metadata-index-mapping.yml";
private static final String JOB_METADATA_INDEX_SETTINGS_FILE_NAME =
"job-metadata-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();
private final Client client;
private final ClusterService clusterService;

/**
* This class implements JobMetadataStorageService interface using OpenSearch as underlying
* storage.
*
* @param client opensearch NodeClient.
* @param clusterService ClusterService.
*/
public OpensearchAsyncQueryJobMetadataStorageService(
Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}
private final StateStore stateStore;

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
createJobMetadataIndex();
}
IndexRequest indexRequest = new IndexRequest(JOB_METADATA_INDEX);
indexRequest.id(asyncQueryJobMetadata.getJobId());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<IndexResponse> indexResponseActionFuture;
IndexResponse indexResponse;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
indexRequest.source(AsyncQueryJobMetadata.convertToXContent(asyncQueryJobMetadata));
indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();
} catch (Exception e) {
throw new RuntimeException(e);
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("JobMetadata : {} successfully created", asyncQueryJobMetadata.getJobId());
} else {
throw new RuntimeException(
"Saving job metadata information failed with result : "
+ indexResponse.getResult().getLowercase());
}
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
createJobMetaData(stateStore, queryId.getDataSourceName()).apply(asyncQueryJobMetadata);
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
try {
InputStream mappingFileStream =
OpensearchAsyncQueryJobMetadataStorageService.class
.getClassLoader()
.getResourceAsStream(JOB_METADATA_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpensearchAsyncQueryJobMetadataStorageService.class
.getClassLoader()
.getResourceAsStream(JOB_METADATA_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(JOB_METADATA_INDEX);
createIndexRequest
.mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML)
.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest);
}
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();
if (createIndexResponse.isAcknowledged()) {
LOG.info("Index: {} creation Acknowledged", JOB_METADATA_INDEX);
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
throw new RuntimeException(
"Internal server error while creating"
+ JOB_METADATA_INDEX
+ " index:: "
+ e.getMessage());
}
}

private List<AsyncQueryJobMetadata> searchInJobMetadataIndex(QueryBuilder query) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(JOB_METADATA_INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query);
searchSourceBuilder.size(1);
searchRequest.source(searchSourceBuilder);
// https://github.com/opensearch-project/sql/issues/1801.
searchRequest.preference("_primary_first");
ActionFuture<SearchResponse> searchResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
searchResponseActionFuture = client.search(searchRequest);
}
SearchResponse searchResponse = searchResponseActionFuture.actionGet();
if (searchResponse.status().getStatus() != 200) {
throw new RuntimeException(
"Fetching job metadata information failed with status : " + searchResponse.status());
} else {
List<AsyncQueryJobMetadata> list = new ArrayList<>();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
String sourceAsString = searchHit.getSourceAsString();
AsyncQueryJobMetadata asyncQueryJobMetadata;
try {
asyncQueryJobMetadata = AsyncQueryJobMetadata.toJobMetadata(sourceAsString);
} catch (IOException e) {
throw new RuntimeException(e);
}
list.add(asyncQueryJobMetadata);
}
return list;
}
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.sql.spark.utils.IDUtils.decode;
import static org.opensearch.sql.spark.utils.IDUtils.encode;

import lombok.Data;

/** Async query id. */
@Data
public class AsyncQueryId {
private final String id;

public static AsyncQueryId newAsyncQueryId(String datasourceName) {
return new AsyncQueryId(encode(datasourceName));
}

public String getDataSourceName() {
return decode(id);
}

/** OpenSearch DocId. */
public String docId() {
return "qid" + id;
}

@Override
public String toString() {
return "asyncQueryId=" + id;

Check warning on line 33 in spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java#L33

Added line #L33 was not covered by tests
}
}
Loading

0 comments on commit 0f09aec

Please sign in to comment.