diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index f6a0f2bd5..153cf5ed6 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -12,6 +12,24 @@ can do so by running the following command: ``` sbt integtest/test ``` +### AWS Integration Test +The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain. +``` +export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws +export AWS_REGION=us-west-2 +``` +And run the +``` +sbt integtest/integration + +[info] AWSOpenSearchAccessTestSuite: +[info] - should Create Pit on AWS OpenSearch +[info] Run completed in 3 seconds, 116 milliseconds. +[info] Total number of tests run: 1 +[info] Suites: completed 1, aborted 0 +[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 +[info] All tests passed. +``` ## Scala Formatting Guidelines diff --git a/build.sbt b/build.sbt index e21687204..c190c43f3 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ import Dependencies._ lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" -lazy val opensearchVersion = "2.6.0" +lazy val opensearchVersion = "2.11.1" lazy val icebergVersion = "1.5.0" val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".") @@ -36,6 +36,8 @@ lazy val compileScalastyle = taskKey[Unit]("compileScalastyle") // Run as part of test task. lazy val testScalastyle = taskKey[Unit]("testScalastyle") + + lazy val commonSettings = Seq( javacOptions ++= Seq("-source", "11"), Compile / compile / javacOptions ++= Seq("-target", "11"), @@ -63,6 +65,10 @@ lazy val flintCore = (project in file("flint-core")) "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion exclude ("org.apache.logging.log4j", "log4j-api"), + "org.opensearch.client" % "opensearch-java" % opensearchVersion + exclude ("com.fasterxml.jackson.core", "jackson-databind") + exclude ("com.fasterxml.jackson.core", "jackson-core") + exclude ("org.apache.httpcomponents.client5", "httpclient5"), "dev.failsafe" % "failsafe" % "3.3.2", "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), @@ -86,6 +92,7 @@ lazy val flintCore = (project in file("flint-core")) libraryDependencies ++= deps(sparkVersion), publish / skip := true) + lazy val flintCommons = (project in file("flint-commons")) .settings( commonSettings, @@ -197,6 +204,8 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) }, assembly / test := (Test / test).value) +lazy val IntegrationTest = config("it") extend Test + // Test assembly package with integration test. lazy val integtest = (project in file("integ-test")) .dependsOn(flintCommons % "test->test", flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test") @@ -204,6 +213,9 @@ lazy val integtest = (project in file("integ-test")) commonSettings, name := "integ-test", scalaVersion := scala212, + inConfig(IntegrationTest)(Defaults.testSettings), + IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", + IntegrationTest / parallelExecution := false, libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), @@ -219,7 +231,11 @@ lazy val integtest = (project in file("integ-test")) libraryDependencies ++= deps(sparkVersion), Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value, (sparkSqlApplication / assembly).value - )) + ), + IntegrationTest / dependencyClasspath ++= (Test / dependencyClasspath).value, + integration := (IntegrationTest / test).value, + ) +lazy val integration = taskKey[Unit]("Run integration tests") lazy val standaloneCosmetic = project .settings( diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 5c1080f8c..e81508078 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -6,6 +6,8 @@ package org.opensearch.flint.core; import org.opensearch.OpenSearchException; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; @@ -20,14 +22,14 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.action.update.UpdateRequest; -import org.opensearch.action.DocWriteResponse; +import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.CreatePitRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.client.indices.PutMappingRequest; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.client.RequestOptions; import org.opensearch.flint.core.logging.CustomLogging; import org.opensearch.flint.core.logging.OperationMessage; import org.opensearch.flint.core.metrics.MetricsUtil; @@ -67,6 +69,7 @@ public interface IRestHighLevelClient extends Closeable { DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException; + CreatePitResponse createPit(CreatePitRequest request) throws IOException; /** * Records the success of an OpenSearch operation by incrementing the corresponding metric counter. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index fa5696f50..272380e59 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -28,6 +28,11 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.client.indices.PutMappingRequest; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.CreatePitRequest; +import org.opensearch.client.transport.rest_client.RestClientTransport; import java.io.IOException; @@ -40,6 +45,8 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient { private final RestHighLevelClient client; + private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper(); + /** * Constructs a new RestHighLevelClientWrapper. * @@ -114,6 +121,11 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); } + @Override + public CreatePitResponse createPit(CreatePitRequest request) throws IOException { + return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request)); + } + /** * Executes a given operation, tracks metrics, and handles exceptions. * @@ -148,4 +160,9 @@ private interface IOCallable { public void close() throws IOException { client.close(); } + + private OpenSearchClient openSearchClient() { + return new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),JACKSON_MAPPER + )); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java index dd5fd78bc..a3925999e 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java @@ -11,6 +11,8 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.Signer; import com.amazonaws.http.HttpMethodName; + +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -100,6 +102,9 @@ public void process(final HttpRequest request, final HttpContext context) (HttpEntityEnclosingRequest) request; if (httpEntityEnclosingRequest.getEntity() != null) { signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent()); + } else { + // known issue of AWS Sigv4 signer. https://github.com/aws/aws-sdk-java/issues/2078 + signableRequest.setContent(new ByteArrayInputStream(new byte[0])); } } signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams())); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/auth/ResourceBasedAWSRequestSigningApacheInterceptor.java b/flint-core/src/main/scala/org/opensearch/flint/core/auth/ResourceBasedAWSRequestSigningApacheInterceptor.java index b69343730..66f2e359c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/auth/ResourceBasedAWSRequestSigningApacheInterceptor.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/auth/ResourceBasedAWSRequestSigningApacheInterceptor.java @@ -13,7 +13,7 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.protocol.HttpContext; import org.jetbrains.annotations.TestOnly; -import org.opensearch.common.Strings; +import org.opensearch.core.common.Strings; import software.amazon.awssdk.authcrt.signer.AwsCrtV4aSigner; import java.io.IOException; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala index 4c1991edc..8cdf4a2af 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala @@ -7,9 +7,10 @@ package org.opensearch.flint.core.metadata import java.nio.charset.StandardCharsets.UTF_8 -import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent._ import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.xcontent.{DeprecationHandler, NamedXContentRegistry, XContentBuilder, XContentParser} /** * JSON parsing and building helper. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e5b06ae5f..08e606ea5 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,18 +5,7 @@ package org.opensearch.flint.core.storage; -import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.Function; -import java.util.logging.Logger; -import java.util.stream.Collectors; +import com.google.common.base.Strings; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; @@ -24,11 +13,10 @@ import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; @@ -40,6 +28,19 @@ import org.opensearch.search.builder.SearchSourceBuilder; import scala.Option; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; + /** * Flint client implementation for OpenSearch storage. */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index c047ced51..e002e132b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -18,7 +18,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; -import org.opensearch.common.Strings; +import org.opensearch.core.common.Strings; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.RestHighLevelClientWrapper; @@ -37,7 +37,10 @@ public class OpenSearchClientUtils { */ public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; - public static IRestHighLevelClient createClient(FlintOptions options) { + /** + * Used in IT. + */ + public static RestHighLevelClient createRestHighLevelClient(FlintOptions options) { RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); @@ -53,7 +56,11 @@ public static IRestHighLevelClient createClient(FlintOptions options) { final RequestConfigurator callback = new RequestConfigurator(options); restClientBuilder.setRequestConfigCallback(callback); - return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); + return new RestHighLevelClient(restClientBuilder); + } + + public static IRestHighLevelClient createClient(FlintOptions options) { + return new RestHighLevelClientWrapper(createRestHighLevelClient(options)); } private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java index 19ce6ce8b..8914a3af9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java @@ -5,22 +5,14 @@ package org.opensearch.flint.core.storage; -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.common.Strings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Optional; -import java.util.logging.Level; import java.util.logging.Logger; import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d3b53c2a6..3c671e77d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -11,15 +11,14 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.Strings; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Optional; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index d0875d492..f4c1cb0c9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -11,8 +11,8 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.client.RequestOptions; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.rest.RestStatus; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.rest.RestStatus; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchAccessTestSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchAccessTestSuite.scala new file mode 100644 index 000000000..937f94b9f --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchAccessTestSuite.scala @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.aws + +import org.opensearch.client.opensearch._types.Time +import org.opensearch.client.opensearch.core.pit.CreatePitRequest +import org.opensearch.flint.core.storage.OpenSearchClientUtils +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class AWSOpenSearchAccessTestSuite + extends AnyFlatSpec + with BeforeAndAfter + with Matchers + with AWSOpenSearchSuite { + + it should "Create Pit on AWS OpenSearch" in { + val indexName = "t00001" + withIndexName(indexName) { + simpleIndex(indexName) + + val pit = OpenSearchClientUtils + .createClient(options) + .createPit( + new CreatePitRequest.Builder() + .targetIndexes(indexName) + .keepAlive(new Time.Builder().time("10s").build()) + .build()) + pit.pitId() should not be "" + } + } +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchSuite.scala new file mode 100644 index 000000000..a497895d7 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/aws/AWSOpenSearchSuite.scala @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.aws + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} +import org.opensearch.common.xcontent.XContentType +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.OpenSearchClientUtils + +trait AWSOpenSearchSuite { + + lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST") + lazy val testPort: Int = -1 + lazy val testRegion: String = System.getenv("AWS_REGION") + lazy val testScheme: String = "https" + lazy val testAuth: String = "sigv4" + + lazy val options: FlintOptions = new FlintOptions(openSearchOptions.asJava) + + protected lazy val openSearchClient = OpenSearchClientUtils.createRestHighLevelClient(options) + + protected lazy val openSearchOptions = + Map( + s"${FlintOptions.HOST}" -> testHost, + s"${FlintOptions.PORT}" -> s"$testPort", + s"${FlintOptions.SCHEME}" -> testScheme, + s"${FlintOptions.REGION}" -> testRegion, + s"${FlintOptions.AUTH}" -> testAuth) + + val oneNodeSetting = """{ + | "number_of_shards": "1", + | "number_of_replicas": "0" + |}""".stripMargin + + /** + * Delete index `indexNames` after calling `f`. + */ + protected def withIndexName(indexNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + indexNames.foreach { indexName => + openSearchClient + .indices() + .delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT) + } + } + } + + def simpleIndex(indexName: String): Unit = { + val mappings = """{ + | "properties": { + | "accountId": { + | "type": "keyword" + | }, + | "eventName": { + | "type": "keyword" + | }, + | "eventSource": { + | "type": "keyword" + | } + | } + |}""".stripMargin + val docs = Seq("""{ + | "accountId": "123", + | "eventName": "event", + | "eventSource": "source" + |}""".stripMargin) + index(indexName, oneNodeSetting, mappings, docs) + } + + def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = { + openSearchClient.indices.create( + new CreateIndexRequest(index) + .settings(settings, XContentType.JSON) + .mapping(mappings, XContentType.JSON), + RequestOptions.DEFAULT) + + val getIndexResponse = + openSearchClient.indices().get(new GetIndexRequest(index), RequestOptions.DEFAULT) + assume(getIndexResponse.getIndices.contains(index), s"create index $index failed") + + /** + * 1. Wait until refresh the index. + */ + if (docs.nonEmpty) { + val request = new BulkRequest().setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) + for (doc <- docs) { + request.add(new IndexRequest(index).source(doc, XContentType.JSON)) + } + + val response = + openSearchClient.bulk(request, RequestOptions.DEFAULT) + + assume( + !response.hasFailures, + s"bulk index docs to $index failed: ${response.buildFailureMessage()}") + } + } +} diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 8cad8844b..43d997671 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -17,7 +17,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.Timer import org.json4s.native.Serialization import org.opensearch.action.get.GetResponse -import org.opensearch.common.Strings +import org.opensearch.core.common.Strings import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index f5e4ec2be..155f43e84 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -12,14 +12,13 @@ import java.util.Locale import scala.util.{Failure, Success, Try} import org.opensearch.action.get.{GetRequest, GetResponse} -import org.opensearch.action.search.{SearchRequest, SearchResponse} -import org.opensearch.client.{RequestOptions, RestHighLevelClient} -import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest, GetIndexResponse} -import org.opensearch.client.indices.CreateIndexRequest -import org.opensearch.common.Strings +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentParser, XContentType} -import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.common.Strings +import org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchScrollReader, OpenSearchUpdater} @@ -35,8 +34,8 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { val flintClient: FlintClient = FlintClientBuilder.build(flintOptions) /** - * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link - * QueryBuilder} from DSL query string. + * {@link org.opensearch.core.xcontent.NamedXContentRegistry} from {@link SearchModule} used for + * construct {@link QueryBuilder} from DSL query string. */ private val xContentRegistry: NamedXContentRegistry = new NamedXContentRegistry( new SearchModule(Settings.builder.build, new ArrayList[SearchPlugin]).getNamedXContents)