diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index 0d2dc94bd4..e0c13ff005 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -29,6 +29,7 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; @Override public DataSourceType getDataSourceType() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 3942c9a772..ac2213841d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.*; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; @@ -21,6 +22,7 @@ import java.util.function.Supplier; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.text.StringEscapeUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -117,6 +119,11 @@ public Builder dataSource(DataSourceMetadata metadata) { config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); config.put(FLINT_DATA_SOURCE_KEY, metadata.getName()); + config.put( + EMR_LAKEFORMATION_OPTION, + Boolean.toString( + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)))); + setFlintIndexStoreHost( parseUri( metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName())); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 0a574ef730..43aadb218e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -105,4 +105,7 @@ public class SparkConstants { public static final String SPARK_CATALOG_CATALOG_IMPL = "spark.sql.catalog.spark_catalog.catalog-impl"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + + public static final String EMR_LAKEFORMATION_OPTION = + "spark.emr-serverless.lakeformation.enabled"; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 1f250a0aea..424c1fc18c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -936,6 +936,11 @@ void testDispatchQueryWithExtraSparkSubmitParameters() { private String constructExpectedSparkSubmitParameterString( String auth, Map authParams, String query) { + return constructExpectedSparkSubmitParameterString(auth, authParams, query, false); + } + + private String constructExpectedSparkSubmitParameterString( + String auth, Map authParams, String query, boolean lakeFormationEnabled) { StringBuilder authParamConfigBuilder = new StringBuilder(); for (String key : authParams.keySet()) { authParamConfigBuilder.append(" --conf "); @@ -979,6 +984,7 @@ private String constructExpectedSparkSubmitParameterString( + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + " --conf spark.flint.datasource.name=my_glue " + + (lakeFormationEnabled ? " --conf spark.emr-serverless.lakeformation.enabled=true " : "") + authParamConfigBuilder + " --conf spark.flint.job.query=" + query