Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into pr-SearchAfter
Browse files Browse the repository at this point in the history
  • Loading branch information
penghuo committed Aug 5, 2024
2 parents 92b0fc2 + d3e54d4 commit eb69d2e
Show file tree
Hide file tree
Showing 36 changed files with 2,693 additions and 161 deletions.
6 changes: 6 additions & 0 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ can do so by running the following command:
```
sbt integtest/test
```
If you get integration test failures with error message "Previous attempts to find a Docker environment failed" in macOS, fix the issue by following the checklist:
1. Check you've installed Docker in your dev host. If not, install Docker first.
2. Check if the file /var/run/docker.sock exists. If not, go to `3`.
3. Run `sudo ln -s $HOME/.docker/desktop/docker.sock /var/run/docker.sock` or `sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock`
4. If you use Docker Desktop, as an alternative of `3`, check mark the "Allow the default Docker socket to be used (requires password)" in advanced settings of Docker Desktop.

### AWS Integration Test
The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
Expand Down
13 changes: 12 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index
- **Extended Usage**: Display additional information, including the following output columns:
- error: error message if the index is in failed status

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
SHOW FLINT [INDEX|INDEXES] [EXTENDED] IN catalog[.database]
```

Example:
Expand All @@ -344,6 +346,15 @@ fetched rows / total rows = 3/3
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing |
| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
sql> SHOW FLINT INDEXES EXTENDED IN spark_catalog.default;
fetched rows / total rows = 2/2
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
| flint_index_name | kind | database | table | index_name | auto_refresh | status | error |
|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------|
| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active | NULL |
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | failed | failure in bulk execution:... |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
```

- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ public class FlintOptions implements Serializable {

public static final String SCHEME = "scheme";

public static final String AUTH = "auth";
/**
* Service name used for SigV4 signature.
* `es`: Amazon OpenSearch Service
* `aoss`: Amazon OpenSearch Serverless
*/
public static final String SERVICE_NAME = "auth.servicename";
public static final String SERVICE_NAME_ES = "es";
public static final String SERVICE_NAME_AOSS = "aoss";

public static final String AUTH = "auth";
public static final String NONE_AUTH = "noauth";

public static final String SIGV4_AUTH = "sigv4";

public static final String BASIC_AUTH = "basic";

public static final String USERNAME = "auth.username";

public static final String PASSWORD = "auth.password";

public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider";
Expand Down Expand Up @@ -140,6 +144,10 @@ public String getAuth() {
return options.getOrDefault(AUTH, NONE_AUTH);
}

public String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

public String getCustomAwsCredentialsProvider() {
return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.flint.core.auth;

import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_CONTENT_SHA256;
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;

import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand All @@ -31,6 +33,7 @@
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.opensearch.flint.core.storage.OpenSearchClientUtils;

/**
* From https://github.com/opensearch-project/sql-jdbc/blob/main/src/main/java/org/opensearch/jdbc/transport/http/auth/aws/AWSRequestSigningApacheInterceptor.java
Expand Down Expand Up @@ -74,13 +77,6 @@ public AWSRequestSigningApacheInterceptor(final String service,
@Override
public void process(final HttpRequest request, final HttpContext context)
throws HttpException, IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getRequestLine().getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
}

// Copy Apache HttpRequest to AWS DefaultRequest
DefaultRequest<?> signableRequest = new DefaultRequest<>(service);

Expand All @@ -91,7 +87,10 @@ public void process(final HttpRequest request, final HttpContext context)
final HttpMethodName httpMethod =
HttpMethodName.fromValue(request.getRequestLine().getMethod());
signableRequest.setHttpMethod(httpMethod);

URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getRequestLine().getUri());
signableRequest.setResourcePath(uriBuilder.build().getRawPath());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
Expand All @@ -110,6 +109,10 @@ public void process(final HttpRequest request, final HttpContext context)
signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));

if (SERVICE_NAME_AOSS.equals(service)) {
enableContentBodySignature(signableRequest);
}

// Sign it
signer.sign(signableRequest, awsCredentialsProvider.getCredentials());

Expand All @@ -126,6 +129,11 @@ public void process(final HttpRequest request, final HttpContext context)
}
}

private void enableContentBodySignature(DefaultRequest<?> signableRequest) {
// AWS4Signer will add `x-amz-content-sha256` header when this header is set
signableRequest.addHeader(X_AMZ_CONTENT_SHA256, "required");
}

/**
*
* @param params list of HTTP query params as NameValuePairs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core.auth;

import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.http.HttpException;
Expand All @@ -14,6 +16,7 @@
import org.apache.http.protocol.HttpContext;
import org.jetbrains.annotations.TestOnly;
import org.opensearch.common.Strings;
import org.opensearch.flint.core.storage.OpenSearchClientUtils;
import software.amazon.awssdk.authcrt.signer.AwsCrtV4aSigner;

import java.io.IOException;
Expand Down Expand Up @@ -84,7 +87,7 @@ public ResourceBasedAWSRequestSigningApacheInterceptor(final String service,
@Override
public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
String resourcePath = parseUriToPath(request);
if ("es".equals(this.service) && isMetadataAccess(resourcePath)) {
if (SERVICE_NAME_ES.equals(this.service) && isMetadataAccess(resourcePath)) {
metadataAccessInterceptor.process(request, context);
} else {
primaryInterceptor.process(request, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
*/
public class OpenSearchClientUtils {

private static final String SERVICE_NAME = "es";

/**
* Metadata log index name prefix
*/
Expand Down Expand Up @@ -90,7 +88,7 @@ private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClient
restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.addInterceptorLast(
new ResourceBasedAWSRequestSigningApacheInterceptor(
SERVICE_NAME, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
options.getServiceName(), options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.opensearch.flint.core.auth;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.auth.signer.internal.SignerConstant.X_AMZ_CONTENT_SHA256;

import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import com.amazonaws.http.HttpMethodName;
import com.amazonaws.util.IOUtils;
import java.io.IOException;
import java.net.URI;
import org.apache.http.HttpHost;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.utils.StringInputStream;

@ExtendWith(MockitoExtension.class)
class AWSRequestSigningApacheInterceptorTest {

@Mock
AWSCredentialsProvider awsCredentialsProvider;
@Mock Signer signer;
@Mock
AWSCredentials awsCredentials;

@Captor
ArgumentCaptor<DefaultRequest<?>> signableRequestCaptor;

@Test
public void testProcessWithServiceIsEs() throws Exception {
AWSRequestSigningApacheInterceptor awsRequestSigningApacheInterceptor = new AWSRequestSigningApacheInterceptor("es", signer, awsCredentialsProvider);
final BasicHttpEntityEnclosingRequest request = getRequestWithEntity();
final BasicHttpContext context = getContext();
when(awsCredentialsProvider.getCredentials()).thenReturn(awsCredentials);

awsRequestSigningApacheInterceptor.process(request, context);

verify(signer).sign(signableRequestCaptor.capture(), eq(awsCredentials));
DefaultRequest<?> signableRequest = signableRequestCaptor.getValue();
assertEquals(new URI("http://hello.world"), signableRequest.getEndpoint());
assertEquals(HttpMethodName.POST, signableRequest.getHttpMethod());
assertEquals("/path", signableRequest.getResourcePath());
assertEquals("ENTITY", IOUtils.toString(signableRequest.getContent()));
assertEquals("HeaderValue", signableRequest.getHeaders().get("Test-Header"));
assertEquals("value0", signableRequest.getParameters().get("param0").get(0));
}

@Test
public void testProcessWithoutEntity() throws Exception {
AWSRequestSigningApacheInterceptor awsRequestSigningApacheInterceptor = new AWSRequestSigningApacheInterceptor("es", signer, awsCredentialsProvider);
final BasicHttpEntityEnclosingRequest request = getRequest();
final BasicHttpContext context = getContext();
when(awsCredentialsProvider.getCredentials()).thenReturn(awsCredentials);

awsRequestSigningApacheInterceptor.process(request, context);

verify(signer).sign(signableRequestCaptor.capture(), eq(awsCredentials));
DefaultRequest<?> signableRequest = signableRequestCaptor.getValue();
assertEquals("", IOUtils.toString(signableRequest.getContent()));
}

@NotNull
private static BasicHttpContext getContext() {
BasicHttpContext context = new BasicHttpContext();
context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, new HttpHost("hello.world"));
return context;
}

@Test
public void testProcessWithServiceIsAoss() throws Exception {
AWSRequestSigningApacheInterceptor awsRequestSigningApacheInterceptor = new AWSRequestSigningApacheInterceptor("aoss", signer, awsCredentialsProvider);
final BasicHttpEntityEnclosingRequest request = getRequest();
final BasicHttpContext context = getContext();
when(awsCredentialsProvider.getCredentials()).thenReturn(awsCredentials);

awsRequestSigningApacheInterceptor.process(request, context);

verify(signer).sign(signableRequestCaptor.capture(), eq(awsCredentials));
DefaultRequest<?> signableRequest = signableRequestCaptor.getValue();
assertEquals("required", signableRequest.getHeaders().get(X_AMZ_CONTENT_SHA256));
}

@Test
public void testInvalidURI() throws Exception {
AWSRequestSigningApacheInterceptor awsRequestSigningApacheInterceptor = new AWSRequestSigningApacheInterceptor("aoss", signer, awsCredentialsProvider);
final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "::INVALID_URI::");
final BasicHttpContext context = getContext();

assertThrows(IOException.class, () -> {
awsRequestSigningApacheInterceptor.process(request, context);
});
}

@NotNull
private static BasicHttpEntityEnclosingRequest getRequestWithEntity() {
BasicHttpEntityEnclosingRequest request = getRequest();
BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
basicHttpEntity.setContent(new StringInputStream("ENTITY"));
request.setEntity(basicHttpEntity);
request.setHeader("content-length", "6");
return request;
}

@NotNull
private static BasicHttpEntityEnclosingRequest getRequest() {
BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "https://hello.world/path?param0=value0");
request.setHeader("Test-Header", "HeaderValue");
request.setHeader("content-length", "0");
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ object FlintSparkConf {
"noauth(no auth), sigv4(sigv4 auth), basic(basic auth)")
.createWithDefault(FlintOptions.NONE_AUTH)

val SERVICE_NAME = FlintConfig("spark.datasource.flint.auth.servicename")
.datasourceOption()
.doc("service name used for SigV4 signature. " +
"es (AWS OpenSearch Service), aoss (Amazon OpenSearch Serverless)")
.createWithDefault(FlintOptions.SERVICE_NAME_ES)

val USERNAME = FlintConfig("spark.datasource.flint.auth.username")
.datasourceOption()
.doc("basic auth username")
Expand Down Expand Up @@ -271,6 +277,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
RETRYABLE_HTTP_STATUS_CODES,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
SERVICE_NAME,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
Expand Down
Loading

0 comments on commit eb69d2e

Please sign in to comment.