Skip to content

Commit

Permalink
Add option to use LakeFormation in S3Glue data source. (#2624)
Browse files Browse the repository at this point in the history
* Add option to use LakeFormation in S3Glue data source.

Signed-off-by: Adi Suresh <[email protected]>

* Update s3glue_connector.rst

corrected formatting issue.

Signed-off-by: Vamsi Manohar <[email protected]>

---------

Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Vamsi Manohar <[email protected]>
Co-authored-by: Vamsi Manohar <[email protected]>
  • Loading branch information
asuresh8 and vmmusings authored Apr 29, 2024
1 parent e578a57 commit ea08c8f
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class GlueDataSourceFactory implements DataSourceFactory {
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";
public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled";

@Override
public DataSourceType getDataSourceType() {
Expand Down
8 changes: 5 additions & 3 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ s3Glue connector provides a way to query s3 files using glue as metadata store a
This page covers s3Glue datasource configuration and also how to query and s3Glue datasource.

Required resources for s3 Glue Connector
===================================
========================================
* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration.
More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_
* ``S3``: This is where the data lies.
Expand All @@ -42,6 +42,7 @@ Glue Connector Properties.
* Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password``
* AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth]
* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified

Sample Glue dataSource configuration
========================================
Expand All @@ -56,7 +57,7 @@ Glue datasource configuration::
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://localhost:9200",
"glue.indexstore.opensearch.auth" :"basicauth",
"glue.indexstore.opensearch.auth.username" :"username"
"glue.indexstore.opensearch.auth.username" :"username",
"glue.indexstore.opensearch.auth.password" :"password"
},
"resultIndex": "query_execution_result"
Expand All @@ -71,6 +72,7 @@ Glue datasource configuration::
"glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200",
"glue.indexstore.opensearch.auth" :"awssigv4",
"glue.indexstore.opensearch.auth.region" :"awssigv4",
"glue.lakeformation.enabled": "true"
},
"resultIndex": "query_execution_result"
}]
Expand All @@ -86,4 +88,4 @@ Sample Queries

These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_

Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.*;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
Expand All @@ -21,6 +22,7 @@
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
Expand Down Expand Up @@ -112,6 +114,11 @@ public Builder dataSource(DataSourceMetadata metadata) {
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
config.put(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled));
config.put(FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled));

setFlintIndexStoreHost(
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ public class SparkConstants {
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
public static final String FLINT_ACCELERATE_USING_COVERING_INDEX =
"spark.flint.optimizer.covering.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,52 @@ void testDispatchSelectQuery() {
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithLakeFormation() {
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
},
query,
true);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:batch",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation();
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue"))
.thenReturn(dataSourceMetadata);

DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(
new DispatchQueryRequest(
EMRS_APPLICATION_ID,
query,
"my_glue",
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
Expand Down Expand Up @@ -936,13 +982,17 @@ void testDispatchQueryWithExtraSparkSubmitParameters() {

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query) {
return constructExpectedSparkSubmitParameterString(auth, authParams, query, false);
}

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query, boolean lakeFormationEnabled) {
StringBuilder authParamConfigBuilder = new StringBuilder();
for (String key : authParams.keySet()) {
authParamConfigBuilder.append(" --conf ");
authParamConfigBuilder.append(" --conf ");
authParamConfigBuilder.append(key);
authParamConfigBuilder.append("=");
authParamConfigBuilder.append(authParams.get(key));
authParamConfigBuilder.append(" ");
}
query = "\"" + query + "\"";
return " --class org.apache.spark.sql.FlintJob --conf"
Expand Down Expand Up @@ -978,9 +1028,13 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ " --conf spark.flint.datasource.name=my_glue "
+ " --conf spark.flint.datasource.name=my_glue --conf"
+ " spark.emr-serverless.lakeformation.enabled="
+ Boolean.toString(lakeFormationEnabled)
+ " --conf spark.flint.optimizer.covering.enabled="
+ Boolean.toString(!lakeFormationEnabled)
+ authParamConfigBuilder
+ " --conf spark.flint.job.query="
+ " --conf spark.flint.job.query="
+ query
+ " ";
}
Expand Down Expand Up @@ -1056,6 +1110,25 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() {
.build();
}

private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() {

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
properties.put("glue.lakeformation.enabled", "true");
return new DataSourceMetadata.Builder()
.setName("my_glue")
.setConnector(DataSourceType.S3GLUE)
.setProperties(properties)
.build();
}

private DataSourceMetadata constructPrometheusDataSourceType() {
return new DataSourceMetadata.Builder()
.setName("my_prometheus")
Expand Down

0 comments on commit ea08c8f

Please sign in to comment.