Skip to content

Commit

Permalink
Add create Pit api and fix sigv4 bug (#434)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Jul 19, 2024
1 parent 43b14f4 commit 26759b9
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 46 deletions.
18 changes: 18 additions & 0 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@ can do so by running the following command:
```
sbt integtest/test
```
### AWS Integration Test
The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws
export AWS_REGION=us-west-2
```
And run the
```
sbt integtest/integration
[info] AWSOpenSearchAccessTestSuite:
[info] - should Create Pit on AWS OpenSearch
[info] Run completed in 3 seconds, 116 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

## Scala Formatting Guidelines

Expand Down
20 changes: 18 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
lazy val opensearchVersion = "2.6.0"
lazy val opensearchVersion = "2.11.1"
lazy val icebergVersion = "1.5.0"

val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".")
Expand Down Expand Up @@ -36,6 +36,8 @@ lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
// Run as part of test task.
lazy val testScalastyle = taskKey[Unit]("testScalastyle")



lazy val commonSettings = Seq(
javacOptions ++= Seq("-source", "11"),
Compile / compile / javacOptions ++= Seq("-target", "11"),
Expand Down Expand Up @@ -63,6 +65,10 @@ lazy val flintCore = (project in file("flint-core"))
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion
exclude ("org.apache.logging.log4j", "log4j-api"),
"org.opensearch.client" % "opensearch-java" % opensearchVersion
exclude ("com.fasterxml.jackson.core", "jackson-databind")
exclude ("com.fasterxml.jackson.core", "jackson-core")
exclude ("org.apache.httpcomponents.client5", "httpclient5"),
"dev.failsafe" % "failsafe" % "3.3.2",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand All @@ -86,6 +92,7 @@ lazy val flintCore = (project in file("flint-core"))
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)


lazy val flintCommons = (project in file("flint-commons"))
.settings(
commonSettings,
Expand Down Expand Up @@ -197,13 +204,18 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
},
assembly / test := (Test / test).value)

lazy val IntegrationTest = config("it") extend Test

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintCommons % "test->test", flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
scalaVersion := scala212,
inConfig(IntegrationTest)(Defaults.testSettings),
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand All @@ -219,7 +231,11 @@ lazy val integtest = (project in file("integ-test"))
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
))
),
IntegrationTest / dependencyClasspath ++= (Test / dependencyClasspath).value,
integration := (IntegrationTest / test).value,
)
lazy val integration = taskKey[Unit]("Run integration tests")

lazy val standaloneCosmetic = project
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.flint.core;

import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -20,14 +22,14 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.logging.CustomLogging;
import org.opensearch.flint.core.logging.OperationMessage;
import org.opensearch.flint.core.metrics.MetricsUtil;
Expand Down Expand Up @@ -67,6 +69,7 @@ public interface IRestHighLevelClient extends Closeable {

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;

CreatePitResponse createPit(CreatePitRequest request) throws IOException;

/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;

Expand All @@ -40,6 +45,8 @@
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();

/**
* Constructs a new RestHighLevelClientWrapper.
*
Expand Down Expand Up @@ -114,6 +121,11 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
}

/**
* Executes a given operation, tracks metrics, and handles exceptions.
*
Expand Down Expand Up @@ -148,4 +160,9 @@ private interface IOCallable<T> {
public void close() throws IOException {
client.close();
}

private OpenSearchClient openSearchClient() {
return new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),JACKSON_MAPPER
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import com.amazonaws.http.HttpMethodName;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -100,6 +102,9 @@ public void process(final HttpRequest request, final HttpContext context)
(HttpEntityEnclosingRequest) request;
if (httpEntityEnclosingRequest.getEntity() != null) {
signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent());
} else {
// known issue of AWS Sigv4 signer. https://github.com/aws/aws-sdk-java/issues/2078
signableRequest.setContent(new ByteArrayInputStream(new byte[0]));
}
}
signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.protocol.HttpContext;
import org.jetbrains.annotations.TestOnly;
import org.opensearch.common.Strings;
import org.opensearch.core.common.Strings;
import software.amazon.awssdk.authcrt.signer.AwsCrtV4aSigner;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package org.opensearch.flint.core.metadata

import java.nio.charset.StandardCharsets.UTF_8

import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent._
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.xcontent.{DeprecationHandler, NamedXContentRegistry, XContentBuilder, XContentParser}

/**
* JSON parsing and building helper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,18 @@

package org.opensearch.flint.core.storage;

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
Expand All @@ -40,6 +28,19 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

/**
* Flint client implementation for OpenSearch storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.core.common.Strings;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
Expand All @@ -37,7 +37,10 @@ public class OpenSearchClientUtils {
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

public static IRestHighLevelClient createClient(FlintOptions options) {
/**
* Used in IT.
*/
public static RestHighLevelClient createRestHighLevelClient(FlintOptions options) {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand All @@ -53,7 +56,11 @@ public static IRestHighLevelClient createClient(FlintOptions options) {
final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
return new RestHighLevelClient(restClientBuilder);
}

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options));
}

private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,14 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.rest.RestStatus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.aws

import org.opensearch.client.opensearch._types.Time
import org.opensearch.client.opensearch.core.pit.CreatePitRequest
import org.opensearch.flint.core.storage.OpenSearchClientUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class AWSOpenSearchAccessTestSuite
extends AnyFlatSpec
with BeforeAndAfter
with Matchers
with AWSOpenSearchSuite {

it should "Create Pit on AWS OpenSearch" in {
val indexName = "t00001"
withIndexName(indexName) {
simpleIndex(indexName)

val pit = OpenSearchClientUtils
.createClient(options)
.createPit(
new CreatePitRequest.Builder()
.targetIndexes(indexName)
.keepAlive(new Time.Builder().time("10s").build())
.build())
pit.pitId() should not be ""
}
}
}
Loading

0 comments on commit 26759b9

Please sign in to comment.