Skip to content

Commit

Permalink
Add option to use LakeFormation in S3Glue data source.
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Apr 17, 2024
1 parent 204c7da commit 3c020f0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class GlueDataSourceFactory implements DataSourceFactory {
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";
public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled";

@Override
public DataSourceType getDataSourceType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.*;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
Expand All @@ -21,6 +22,7 @@
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
Expand Down Expand Up @@ -117,6 +119,10 @@ public Builder dataSource(DataSourceMetadata metadata) {
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

if (BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED))) {
config.put(EMR_LAKEFORMATION_OPTION, Boolean.TRUE.toString());
}

setFlintIndexStoreHost(
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ public class SparkConstants {
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,11 @@ void testDispatchQueryWithExtraSparkSubmitParameters() {

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query) {
return constructExpectedSparkSubmitParameterString(auth, authParams, query, false);
}

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query, boolean lakeFormationEnabled) {
StringBuilder authParamConfigBuilder = new StringBuilder();
for (String key : authParams.keySet()) {
authParamConfigBuilder.append(" --conf ");
Expand Down Expand Up @@ -979,6 +984,7 @@ private String constructExpectedSparkSubmitParameterString(
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ " --conf spark.flint.datasource.name=my_glue "
+ (lakeFormationEnabled ? " --conf spark.emr-serverless.lakeformation.enabled=true " : "")
+ authParamConfigBuilder
+ " --conf spark.flint.job.query="
+ query
Expand Down

0 comments on commit 3c020f0

Please sign in to comment.