Skip to content

Commit

Permalink
Add SparkDataType as wrapper for unmapped spark data type
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Jan 31, 2024
1 parent 6fcf31b commit 76f5afb
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.data.type;

import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.type.ExprType;

/** Wrapper of spark data type */
@EqualsAndHashCode
@RequiredArgsConstructor
public class SparkDataType implements ExprType {

/** Spark datatype name. */
private final String typeName;

@Override
public String typeName() {
return typeName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.data.value;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.AbstractExprValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.spark.data.type.SparkDataType;

/** SparkExprValue hold spark query response value. */
@RequiredArgsConstructor
public class SparkExprValue extends AbstractExprValue {

private final SparkDataType type;
private final Object value;

@Override
public Object value() {
return value;
}

@Override
public ExprType type() {
return type;
}

@Override
public int compare(ExprValue other) {
throw new UnsupportedOperationException("SparkExprValue is not comparable");
}

@Override
public boolean equal(ExprValue other) {
return value.equals(other.value());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.data.model.ExprFloatValue;
import org.opensearch.sql.data.model.ExprIntegerValue;
import org.opensearch.sql.data.model.ExprLongValue;
import org.opensearch.sql.data.model.ExprNullValue;
import org.opensearch.sql.data.model.ExprShortValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
Expand All @@ -27,6 +28,8 @@
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.spark.data.type.SparkDataType;
import org.opensearch.sql.spark.data.value.SparkExprValue;

/** Default implementation of SparkSqlFunctionResponseHandle. */
public class DefaultSparkSqlFunctionResponseHandle implements SparkSqlFunctionResponseHandle {
Expand Down Expand Up @@ -64,30 +67,43 @@ private static LinkedHashMap<String, ExprValue> extractRow(
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
ExprType type = column.getExprType();
if (type == ExprCoreType.BOOLEAN) {
linkedHashMap.put(column.getName(), ExprBooleanValue.of(row.getBoolean(column.getName())));
} else if (type == ExprCoreType.LONG) {
linkedHashMap.put(column.getName(), new ExprLongValue(row.getLong(column.getName())));
} else if (type == ExprCoreType.INTEGER) {
linkedHashMap.put(column.getName(), new ExprIntegerValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.SHORT) {
linkedHashMap.put(column.getName(), new ExprShortValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.BYTE) {
linkedHashMap.put(column.getName(), new ExprByteValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.DOUBLE) {
linkedHashMap.put(column.getName(), new ExprDoubleValue(row.getDouble(column.getName())));
} else if (type == ExprCoreType.FLOAT) {
linkedHashMap.put(column.getName(), new ExprFloatValue(row.getFloat(column.getName())));
} else if (type == ExprCoreType.DATE) {
// TODO :: correct this to ExprTimestampValue
linkedHashMap.put(column.getName(), new ExprStringValue(row.getString(column.getName())));
} else if (type == ExprCoreType.TIMESTAMP) {
linkedHashMap.put(
column.getName(), new ExprTimestampValue(row.getString(column.getName())));
} else if (type == ExprCoreType.STRING) {
linkedHashMap.put(column.getName(), new ExprStringValue(jsonString(row, column.getName())));
if (!row.has(column.getName())) {
linkedHashMap.put(column.getName(), ExprNullValue.of());
} else {
throw new RuntimeException("Result contains invalid data type");
if (type == ExprCoreType.BOOLEAN) {
linkedHashMap.put(
column.getName(), ExprBooleanValue.of(row.getBoolean(column.getName())));
} else if (type == ExprCoreType.LONG) {
linkedHashMap.put(column.getName(), new ExprLongValue(row.getLong(column.getName())));
} else if (type == ExprCoreType.INTEGER) {
linkedHashMap.put(column.getName(), new ExprIntegerValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.SHORT) {
linkedHashMap.put(column.getName(), new ExprShortValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.BYTE) {
linkedHashMap.put(column.getName(), new ExprByteValue(row.getInt(column.getName())));
} else if (type == ExprCoreType.DOUBLE) {
linkedHashMap.put(column.getName(), new ExprDoubleValue(row.getDouble(column.getName())));
} else if (type == ExprCoreType.FLOAT) {
linkedHashMap.put(column.getName(), new ExprFloatValue(row.getFloat(column.getName())));
} else if (type == ExprCoreType.DATE) {
// TODO :: correct this to ExprTimestampValue
linkedHashMap.put(column.getName(), new ExprStringValue(row.getString(column.getName())));
} else if (type == ExprCoreType.TIMESTAMP) {
linkedHashMap.put(
column.getName(), new ExprTimestampValue(row.getString(column.getName())));
} else if (type == ExprCoreType.STRING) {
linkedHashMap.put(column.getName(), new ExprStringValue(row.getString(column.getName())));
} else {
// SparkDataType
Object jsonValue = row.get(column.getName());
Object value = jsonValue;
if (jsonValue instanceof JSONObject) {
value = ((JSONObject) jsonValue).toMap();
} else if (jsonValue instanceof JSONArray) {
value = ((JSONArray) jsonValue).toList();
}
linkedHashMap.put(column.getName(), new SparkExprValue((SparkDataType) type, value));
}
}
}

Expand All @@ -107,8 +123,8 @@ private List<ExecutionEngine.Schema.Column> getColumnList(JSONArray schema) {
return columnList;
}

private ExprCoreType getDataType(String sparkDataType) {
switch (sparkDataType) {
private ExprType getDataType(String sparkType) {
switch (sparkType) {
case "boolean":
return ExprCoreType.BOOLEAN;
case "long":
Expand All @@ -128,18 +144,12 @@ private ExprCoreType getDataType(String sparkDataType) {
case "date":
return ExprCoreType.TIMESTAMP;
case "string":
case "varchar":
case "char":
return ExprCoreType.STRING;
default:
return ExprCoreType.UNKNOWN;
return new SparkDataType(sparkType);
}
}

private static String jsonString(JSONObject jsonObject, String key) {
return jsonObject.has(key) ? jsonObject.getString(key) : "";
}

@Override
public boolean hasNext() {
return responseIterator.hasNext();
Expand Down
Loading

0 comments on commit 76f5afb

Please sign in to comment.