Skip to content

Commit

Permalink
[Coral-Spark] Refactor CoralSpark API (#440)
Browse files Browse the repository at this point in the history
* [Coral-Spark] [Backward Incompatible] Refactor CoralSpark API

* remove version upgrade

* address PR comments
  • Loading branch information
aastha25 authored Jul 28, 2023
1 parent 8c7bd53 commit 4956719
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class IncrementalUtils {
public static String getSparkIncrementalQueryFromUserSql(String query) {
RelNode originalNode = new HiveToRelConverter(hiveMetastoreClient).convertSql(query);
RelNode incrementalRelNode = RelNodeIncrementalTransformer.convertRelIncremental(originalNode);
CoralSpark coralSpark = CoralSpark.create(incrementalRelNode);
CoralSpark coralSpark = CoralSpark.create(incrementalRelNode, hiveMetastoreClient);
return coralSpark.getSparkSql();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class TranslationUtils {

public static String translateTrinoToSpark(String query) {
RelNode relNode = new TrinoToRelConverter(hiveMetastoreClient).convertSql(query);
CoralSpark coralSpark = CoralSpark.create(relNode);
CoralSpark coralSpark = CoralSpark.create(relNode, hiveMetastoreClient);
return coralSpark.getSparkSql();
}

Expand All @@ -30,7 +30,7 @@ public static String translateHiveToTrino(String query) {

public static String translateHiveToSpark(String query) {
RelNode relNode = new HiveToRelConverter(hiveMetastoreClient).convertSql(query);
CoralSpark coralSpark = CoralSpark.create(relNode);
CoralSpark coralSpark = CoralSpark.create(relNode, hiveMetastoreClient);
return coralSpark.getSparkSql();
}
}
68 changes: 63 additions & 5 deletions coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.calcite.sql.SqlSelect;

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.HiveMetastoreClient;
import com.linkedin.coral.spark.containers.SparkRelInfo;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.dialect.SparkSqlDialect;
Expand All @@ -42,11 +43,22 @@ public class CoralSpark {
private final List<String> baseTables;
private final List<SparkUDFInfo> sparkUDFInfoList;
private final String sparkSql;
private final HiveMetastoreClient hiveMetastoreClient;

@Deprecated
private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql) {
this.baseTables = baseTables;
this.sparkUDFInfoList = sparkUDFInfoList;
this.sparkSql = sparkSql;
this.hiveMetastoreClient = null;
}

private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql,
HiveMetastoreClient hmsClient) {
this.baseTables = baseTables;
this.sparkUDFInfoList = sparkUDFInfoList;
this.sparkSql = sparkSql;
this.hiveMetastoreClient = hmsClient;
}

/**
Expand All @@ -61,8 +73,9 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
*
* @param irRelNode A IR RelNode for which CoralSpark will be constructed.
*
* @return [[CoralSparkInfo]]
* @return [[CoralSpark]]
*/
@Deprecated
public static CoralSpark create(RelNode irRelNode) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
Expand All @@ -72,27 +85,72 @@ public static CoralSpark create(RelNode irRelNode) {
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

/**
* Users use this function as the main API for getting CoralSpark instance.
*
* Internally IR RelNode is converted to Spark RelNode, and Spark RelNode to Spark SQL.
*
* It returns an instance of CoralSpark which contains
* 1) Spark SQL
* 2) Base tables
* 3) Spark UDF information objects, ie. List of {@link SparkUDFInfo}
*
* @param irRelNode A IR RelNode for which CoralSpark will be constructed.
* @param hmsClient client interface used to interact with the Hive Metastore service.
*
* @return [[CoralSpark]]
*/
public static CoralSpark create(RelNode irRelNode, HiveMetastoreClient hmsClient) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQL(sparkRelNode, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient);
}

/**
* Users use this function as the main API for getting CoralSpark instance.
* This should be used when user need to align the Coral-spark translated SQL
* with Coral-schema output schema
*
* @param irRelNode An IR RelNode for which CoralSpark will be constructed.
* @param schema Coral schema that is represented by an Avro schema
* @return [[CoralSparkInfo]]
* @return [[CoralSpark]]
*/
@Deprecated
public static CoralSpark create(RelNode irRelNode, Schema schema) {
List<String> aliases = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
return createWithAlias(irRelNode, aliases);
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases) {
/**
* Users use this function as the main API for getting CoralSpark instance.
* This should be used when user need to align the Coral-spark translated SQL
* with Coral-schema output schema
*
* @param irRelNode An IR RelNode for which CoralSpark will be constructed.
* @param schema Coral schema that is represented by an Avro schema
* @param hmsClient client interface used to interact with the Hive Metastore service.
* @return [[CoralSpark]]
*/
public static CoralSpark create(RelNode irRelNode, Schema schema, HiveMetastoreClient hmsClient) {
List<String> aliases = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
return createWithAlias(irRelNode, aliases, hmsClient);
}

private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases, HiveMetastoreClient hmsClient) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient);
}

/**
Expand Down
Loading

0 comments on commit 4956719

Please sign in to comment.