From ea08c8f7ad2c00509da56b5bbf0798fab5544d60 Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Mon, 29 Apr 2024 17:50:18 -0500 Subject: [PATCH] Add option to use LakeFormation in S3Glue data source. (#2624) * Add option to use LakeFormation in S3Glue data source. Signed-off-by: Adi Suresh * Update s3glue_connector.rst corrected formatting issue. Signed-off-by: Vamsi Manohar --------- Signed-off-by: Adi Suresh Signed-off-by: Vamsi Manohar Co-authored-by: Vamsi Manohar --- .../glue/GlueDataSourceFactory.java | 1 + .../ppl/admin/connectors/s3glue_connector.rst | 8 +- .../model/SparkSubmitParameters.java | 7 ++ .../spark/data/constants/SparkConstants.java | 5 ++ .../dispatcher/SparkQueryDispatcherTest.java | 81 ++++++++++++++++++- 5 files changed, 95 insertions(+), 7 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index 0d2dc94bd4..e0c13ff005 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -29,6 +29,7 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; @Override public DataSourceType getDataSourceType() { diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 190ab08d42..5e91df70e5 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -18,7 +18,7 @@ s3Glue connector provides a way to query s3 files using glue as metadata store a This page covers s3Glue datasource configuration and also how to query and s3Glue datasource. Required resources for s3 Glue Connector -=================================== +======================================== * ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration. More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_ * ``S3``: This is where the data lies. @@ -42,6 +42,7 @@ Glue Connector Properties. * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] +* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified Sample Glue dataSource configuration ======================================== @@ -56,7 +57,7 @@ Glue datasource configuration:: "glue.auth.role_arn": "role_arn", "glue.indexstore.opensearch.uri": "http://localhost:9200", "glue.indexstore.opensearch.auth" :"basicauth", - "glue.indexstore.opensearch.auth.username" :"username" + "glue.indexstore.opensearch.auth.username" :"username", "glue.indexstore.opensearch.auth.password" :"password" }, "resultIndex": "query_execution_result" @@ -71,6 +72,7 @@ Glue datasource configuration:: "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", "glue.indexstore.opensearch.auth" :"awssigv4", "glue.indexstore.opensearch.auth.region" :"awssigv4", + "glue.lakeformation.enabled": "true" }, "resultIndex": "query_execution_result" }] @@ -86,4 +88,4 @@ Sample Queries These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_ -Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md \ No newline at end of file +Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index e400e0a9ea..314e83a6db 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.*; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; @@ -21,6 +22,7 @@ import java.util.function.Supplier; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.text.StringEscapeUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -112,6 +114,11 @@ public Builder dataSource(DataSourceMetadata metadata) { config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); config.put(FLINT_DATA_SOURCE_KEY, metadata.getName()); + final boolean lakeFormationEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); + config.put(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled)); + config.put(FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled)); + setFlintIndexStoreHost( parseUri( metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName())); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 507b774a14..92feba9941 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -106,4 +106,9 @@ public class SparkConstants { public static final String SPARK_CATALOG_CATALOG_IMPL = "spark.sql.catalog.spark_catalog.catalog-impl"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + + public static final String EMR_LAKEFORMATION_OPTION = + "spark.emr-serverless.lakeformation.enabled"; + public static final String FLINT_ACCELERATE_USING_COVERING_INDEX = + "spark.flint.optimizer.covering.enabled"; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 3bec6edcdb..bdadbc13df 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -168,6 +168,52 @@ void testDispatchSelectQuery() { verifyNoInteractions(flintIndexMetadataService); } + @Test + void testDispatchSelectQueryWithLakeFormation() { + HashMap tags = new HashMap<>(); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + String query = "select * from my_glue.default.http_logs"; + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }, + query, + true); + StartJobRequest expected = + new StartJobRequest( + "TEST_CLUSTER:batch", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + "query_execution_result_my_glue"); + when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation(); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + verifyNoInteractions(flintIndexMetadataService); + } + @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); @@ -936,13 +982,17 @@ void testDispatchQueryWithExtraSparkSubmitParameters() { private String constructExpectedSparkSubmitParameterString( String auth, Map authParams, String query) { + return constructExpectedSparkSubmitParameterString(auth, authParams, query, false); + } + + private String constructExpectedSparkSubmitParameterString( + String auth, Map authParams, String query, boolean lakeFormationEnabled) { StringBuilder authParamConfigBuilder = new StringBuilder(); for (String key : authParams.keySet()) { - authParamConfigBuilder.append(" --conf "); + authParamConfigBuilder.append(" --conf "); authParamConfigBuilder.append(key); authParamConfigBuilder.append("="); authParamConfigBuilder.append(authParams.get(key)); - authParamConfigBuilder.append(" "); } query = "\"" + query + "\""; return " --class org.apache.spark.sql.FlintJob --conf" @@ -978,9 +1028,13 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog " - + " --conf spark.flint.datasource.name=my_glue " + + " --conf spark.flint.datasource.name=my_glue --conf" + + " spark.emr-serverless.lakeformation.enabled=" + + Boolean.toString(lakeFormationEnabled) + + " --conf spark.flint.optimizer.covering.enabled=" + + Boolean.toString(!lakeFormationEnabled) + authParamConfigBuilder - + " --conf spark.flint.job.query=" + + " --conf spark.flint.job.query=" + query + " "; } @@ -1056,6 +1110,25 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() { .build(); } + private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() { + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put( + "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); + properties.put( + "glue.indexstore.opensearch.uri", + "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); + properties.put("glue.indexstore.opensearch.auth", "awssigv4"); + properties.put("glue.indexstore.opensearch.region", "eu-west-1"); + properties.put("glue.lakeformation.enabled", "true"); + return new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + } + private DataSourceMetadata constructPrometheusDataSourceType() { return new DataSourceMetadata.Builder() .setName("my_prometheus")