Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Delete Spark datasource (#2638)" #2692

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions docs/user/ppl/admin/connectors/spark_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
.. highlight:: sh

====================
Spark Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

This page covers spark connector properties for dataSource configuration
and the nuances associated with spark connector.


Spark Connector Properties in DataSource Configuration
========================================================
Spark Connector Properties.

* ``spark.connector`` [Required].
* This parameters provides the spark client information for connection.
* ``spark.sql.application`` [Optional].
* This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``.
* ``emr.cluster`` [Required].
* This parameters provides the emr cluster id information.
* ``emr.auth.type`` [Required]
* This parameters provides the authentication type information.
* Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required.
* ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key``
* ``spark.datasource.flint.*`` [Optional]
* This parameters provides the Opensearch domain host information for flint integration.
* ``spark.datasource.flint.integration`` [Optional]
* Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.3.0-SNAPSHOT.jar``.
* ``spark.datasource.flint.host`` [Optional]
* Default value for host is ``localhost``.
* ``spark.datasource.flint.port`` [Optional]
* Default value for port is ``9200``.
* ``spark.datasource.flint.scheme`` [Optional]
* Default value for scheme is ``http``.
* ``spark.datasource.flint.auth`` [Optional]
* Default value for auth is ``false``.
* ``spark.datasource.flint.region`` [Optional]
* Default value for auth is ``us-west-2``.

Example spark dataSource configuration
========================================

AWSSigV4 Auth::

[{
"name" : "my_spark",
"connector": "spark",
"properties" : {
"spark.connector": "emr",
"emr.cluster" : "{{clusterId}}",
"emr.auth.type" : "awssigv4",
"emr.auth.region" : "us-east-1",
"emr.auth.access_key" : "{{accessKey}}"
"emr.auth.secret_key" : "{{secretKey}}"
"spark.datasource.flint.host" : "{{opensearchHost}}",
"spark.datasource.flint.port" : "{{opensearchPort}}",
"spark.datasource.flint.scheme" : "{{opensearchScheme}}",
"spark.datasource.flint.auth" : "{{opensearchAuth}}",
"spark.datasource.flint.region" : "{{opensearchRegion}}",
}
}]


Spark SQL Support
==================

`sql` Function
----------------------------
Spark connector offers `sql` function. This function can be used to run spark sql query.
The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position.
`source=my_spark.sql('select 1')`
or
`source=my_spark.sql(query='select 1')`
Example::

> source=my_spark.sql('select 1')
+---+
| 1 |
|---+
| 1 |
+---+

Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportGetAsyncQueryResultAction;
Expand Down Expand Up @@ -282,6 +283,7 @@ private DataSourceServiceImpl createDataSourceService() {
new OpenSearchDataSourceFactory(
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrClientImpl implements SparkClient {
private final AmazonElasticMapReduce emr;
private final String emrCluster;
private final FlintHelper flint;
private final String sparkApplicationJar;
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
private SparkResponse sparkResponse;

/**
* Constructor for EMR Client Implementation.
*
* @param emr EMR helper
* @param flint Opensearch args for flint integration jar
* @param sparkResponse Response object to help with retrieving results from Opensearch index
*/
public EmrClientImpl(
AmazonElasticMapReduce emr,
String emrCluster,
FlintHelper flint,
SparkResponse sparkResponse,
String sparkApplicationJar) {
this.emr = emr;
this.emrCluster = emrCluster;
this.flint = flint;
this.sparkResponse = sparkResponse;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
}

@Override
public JSONObject sql(String query) throws IOException {
runEmrApplication(query);
return sparkResponse.getResultFromOpensearchIndex();
}

@VisibleForTesting
void runEmrApplication(String query) {

HadoopJarStepConfig stepConfig =
new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(
"spark-submit",
"--class",
"org.opensearch.sql.SQLJob",
"--jars",
flint.getFlintIntegrationJar(),
sparkApplicationJar,
query,
DEFAULT_RESULT_INDEX,
flint.getFlintHost(),
flint.getFlintPort(),
flint.getFlintScheme(),
flint.getFlintAuth(),
flint.getFlintRegion());

StepConfig emrstep =
new StepConfig()
.withName("Spark Application")
.withActionOnFailure(ActionOnFailure.CONTINUE)
.withHadoopJarStep(stepConfig);

AddJobFlowStepsRequest request =
new AddJobFlowStepsRequest().withJobFlowId(emrCluster).withSteps(emrstep);

AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
logger.info("EMR step ID: " + result.getStepIds());

String stepId = result.getStepIds().get(0);
DescribeStepRequest stepRequest =
new DescribeStepRequest().withClusterId(emrCluster).withStepId(stepId);

waitForStepExecution(stepRequest);
sparkResponse.setValue(stepId);
}

@SneakyThrows
private void waitForStepExecution(DescribeStepRequest stepRequest) {
// Wait for the step to complete
boolean completed = false;
while (!completed) {
// Get the step status
StepStatus statusDetail = emr.describeStep(stepRequest).getStep().getStatus();
// Check if the step has completed
if (statusDetail.getState().equals("COMPLETED")) {
completed = true;
logger.info("EMR step completed successfully.");
} else if (statusDetail.getState().equals("FAILED")
|| statusDetail.getState().equals("CANCELLED")) {
logger.error("EMR step failed or cancelled.");
throw new RuntimeException("Spark SQL application failed.");
} else {
// Sleep for some time before checking the status again
Thread.sleep(2500);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import java.io.IOException;
import org.json.JSONObject;

/** Interface class for Spark Client. */
public interface SparkClient {
/**
* This method executes spark sql query.
*
* @param query spark sql query
* @return spark query response
*/
JSONObject sql(String query) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.implementation;

import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.spark.storage.SparkTable;
import org.opensearch.sql.storage.Table;

/** Spark SQL function implementation. */
public class SparkSqlFunctionImplementation extends FunctionExpression
implements TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final SparkClient sparkClient;

/**
* Constructor for spark sql function.
*
* @param functionName name of the function
* @param arguments a list of expressions
* @param sparkClient spark client
*/
public SparkSqlFunctionImplementation(
FunctionName functionName, List<Expression> arguments, SparkClient sparkClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.sparkClient = sparkClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(
String.format(
"Spark defined function [%s] is only "
+ "supported in SOURCE clause with spark connector catalog",
functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args =
arguments.stream()
.map(
arg ->
String.format(
"%s=%s",
((NamedArgumentExpression) arg).getArgName(),
((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments));
}

/**
* This method builds a spark query request.
*
* @param arguments spark sql function arguments
* @return spark query request
*/
private SparkQueryRequest buildQueryFromSqlFunction(List<Expression> arguments) {

SparkQueryRequest sparkQueryRequest = new SparkQueryRequest();
arguments.forEach(
arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
if (argName.equals(QUERY)) {
sparkQueryRequest.setSql((String) literalValue.value());
} else {
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return sparkQueryRequest;
}
}
Loading
Loading