Skip to content

Commit

Permalink
Merge branch 'master' into sql_batch_handler_in_interator
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
  • Loading branch information
miland-db committed Aug 12, 2024
2 parents 9675b3d + 2fb8dff commit 47485ff
Show file tree
Hide file tree
Showing 67 changed files with 1,925 additions and 1,159 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -874,13 +874,19 @@ jobs:
ln -s "$(which python3.9)" "/usr/local/bin/python3"
# Build docs first with SKIP_API to ensure they are buildable without requiring any
# language docs to be built beforehand.
cd docs; SKIP_API=1 bundle exec jekyll build; cd ..
cd docs; SKIP_ERRORDOC=1 SKIP_API=1 bundle exec jekyll build; cd ..
if [ -f "./dev/is-changed.py" ]; then
# Skip PySpark and SparkR docs while keeping Scala/Java/SQL docs
pyspark_modules=`cd dev && python3.9 -c "import sparktestsupport.modules as m; print(','.join(m.name for m in m.all_modules if m.name.startswith('pyspark')))"`
if [ `./dev/is-changed.py -m $pyspark_modules` = false ]; then export SKIP_PYTHONDOC=1; fi
if [ `./dev/is-changed.py -m sparkr` = false ]; then export SKIP_RDOC=1; fi
fi
# Print the values of environment variables `SKIP_ERRORDOC`, `SKIP_SCALADOC`, `SKIP_PYTHONDOC`, `SKIP_RDOC` and `SKIP_SQLDOC`
echo "SKIP_ERRORDOC: $SKIP_ERRORDOC"
echo "SKIP_SCALADOC: $SKIP_SCALADOC"
echo "SKIP_PYTHONDOC: $SKIP_PYTHONDOC"
echo "SKIP_RDOC: $SKIP_RDOC"
echo "SKIP_SQLDOC: $SKIP_SQLDOC"
cd docs
bundle exec jekyll build
- name: Tar documentation
Expand Down
48 changes: 48 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!--
Included so Spark Connect client was compiled before triggering assembly.
See 'get-connect-client-jar' below. This will not be included in the packaging output.
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-client-jvm_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!--
Because we don't shade dependencies anymore, we need to restore Guava to compile scope so
Expand Down Expand Up @@ -159,6 +169,44 @@
</target>
</configuration>
</plugin>
<plugin>
<!--
Copy Spark Connect client jar and its dependencies for Spark Connect REPL.
-->
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>copy-connect-client-repl-jars</id>
<phase>package</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>cp</executable>
<arguments>
<argument>-r</argument>
<argument>${basedir}/../connector/connect/client/jvm/target/connect-repl</argument>
<argument>${basedir}/target/scala-${scala.binary.version}/jars/</argument>
</arguments>
</configuration>
</execution>
<execution>
<id>copy-connect-client-jar</id>
<phase>package</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>cp</executable>
<arguments>
<argument>${basedir}/../connector/connect/client/jvm/target/spark-connect-client-jvm_${scala.binary.version}-${version}.jar</argument>
<argument>${basedir}/target/scala-${scala.binary.version}/jars/connect-repl</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
38 changes: 36 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]
Scala REPL options:
Scala REPL options, Spark Classic only:
-I <file> preload <file>, enforcing line-by-line interpretation"

# SPARK-4161: scala does not assume use of the java classpath,
Expand All @@ -44,8 +44,42 @@ Scala REPL options:
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

# In order to start Spark Connect shell, we should identify if spark.remote
# or --remote is set. Spark Connect does not support loading configurations
# yet.
connect_shell=false
cur_arg="$0"
for arg in "${@:1}"
do
# --conf spark.remote=... or -c spark.remote=...
if [[ $cur_arg == "--conf" || $cur_arg == "-c" ]]; then
if [[ $arg == "spark.remote"* ]]; then
connect_shell=true
fi
fi

# --conf=spark.remote=... or -c=spark.remote=...
if [[ $arg == "--conf=spark.remote"* || $arg == "-c=spark.remote"* ]]; then
connect_shell=true
fi

# --remote= or --remote
if [[ $arg == "--remote"* ]]; then
connect_shell=true
fi
cur_arg=$arg
done

if [ -n "${SPARK_REMOTE}" ]; then
connect_shell=true
fi

function main() {
if $cygwin; then
if $connect_shell; then
export SPARK_SUBMIT_OPTS
export SPARK_CONNECT_SHELL=1
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.application.ConnectRepl --name "Connect shell" "$@"
elif $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@

import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
* Utility class for collation-aware UTF8String operations.
Expand Down Expand Up @@ -1226,6 +1229,60 @@ public static UTF8String trimRight(
return UTF8String.fromString(src.substring(0, charIndex));
}

public static UTF8String[] splitSQL(final UTF8String input, final UTF8String delim,
final int limit, final int collationId) {
if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
return input.split(delim, limit);
} else if (CollationFactory.fetchCollation(collationId).supportsLowercaseEquality) {
return lowercaseSplitSQL(input, delim, limit);
} else {
return icuSplitSQL(input, delim, limit, collationId);
}
}

public static UTF8String[] lowercaseSplitSQL(final UTF8String string, final UTF8String delimiter,
final int limit) {
if (delimiter.numBytes() == 0) return new UTF8String[] { string };
if (string.numBytes() == 0) return new UTF8String[] { UTF8String.EMPTY_UTF8 };
Pattern pattern = Pattern.compile(Pattern.quote(delimiter.toString()),
CollationSupport.lowercaseRegexFlags);
String[] splits = pattern.split(string.toString(), limit);
UTF8String[] res = new UTF8String[splits.length];
for (int i = 0; i < res.length; i++) {
res[i] = UTF8String.fromString(splits[i]);
}
return res;
}

public static UTF8String[] icuSplitSQL(final UTF8String string, final UTF8String delimiter,
final int limit, final int collationId) {
if (delimiter.numBytes() == 0) return new UTF8String[] { string };
if (string.numBytes() == 0) return new UTF8String[] { UTF8String.EMPTY_UTF8 };
List<UTF8String> strings = new ArrayList<>();
String target = string.toString(), pattern = delimiter.toString();
StringSearch stringSearch = CollationFactory.getStringSearch(target, pattern, collationId);
int start = 0, end;
while ((end = stringSearch.next()) != StringSearch.DONE) {
if (limit > 0 && strings.size() == limit - 1) {
break;
}
strings.add(UTF8String.fromString(target.substring(start, end)));
start = end + stringSearch.getMatchLength();
}
if (start <= target.length()) {
strings.add(UTF8String.fromString(target.substring(start)));
}
if (limit == 0) {
// Remove trailing empty strings
int i = strings.size() - 1;
while (i >= 0 && strings.get(i).numBytes() == 0) {
strings.remove(i);
i--;
}
}
return strings.toArray(new UTF8String[0]);
}

// TODO: Add more collation-aware UTF8String operations here.

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.spark.unsafe.types.UTF8String;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -62,33 +60,11 @@ public static UTF8String[] execBinary(final UTF8String string, final UTF8String
return string.splitSQL(delimiter, -1);
}
public static UTF8String[] execLowercase(final UTF8String string, final UTF8String delimiter) {
if (delimiter.numBytes() == 0) return new UTF8String[] { string };
if (string.numBytes() == 0) return new UTF8String[] { UTF8String.EMPTY_UTF8 };
Pattern pattern = Pattern.compile(Pattern.quote(delimiter.toString()),
CollationSupport.lowercaseRegexFlags);
String[] splits = pattern.split(string.toString(), -1);
UTF8String[] res = new UTF8String[splits.length];
for (int i = 0; i < res.length; i++) {
res[i] = UTF8String.fromString(splits[i]);
}
return res;
return CollationAwareUTF8String.lowercaseSplitSQL(string, delimiter, -1);
}
public static UTF8String[] execICU(final UTF8String string, final UTF8String delimiter,
final int collationId) {
if (delimiter.numBytes() == 0) return new UTF8String[] { string };
if (string.numBytes() == 0) return new UTF8String[] { UTF8String.EMPTY_UTF8 };
List<UTF8String> strings = new ArrayList<>();
String target = string.toString(), pattern = delimiter.toString();
StringSearch stringSearch = CollationFactory.getStringSearch(target, pattern, collationId);
int start = 0, end;
while ((end = stringSearch.next()) != StringSearch.DONE) {
strings.add(UTF8String.fromString(target.substring(start, end)));
start = end + stringSearch.getMatchLength();
}
if (start <= target.length()) {
strings.add(UTF8String.fromString(target.substring(start)));
}
return strings.toArray(new UTF8String[0]);
return CollationAwareUTF8String.icuSplitSQL(string, delimiter, -1, collationId);
}
}

Expand Down Expand Up @@ -696,7 +672,7 @@ public static boolean supportsLowercaseRegex(final int collationId) {
return CollationFactory.fetchCollation(collationId).supportsLowercaseEquality;
}

private static final int lowercaseRegexFlags = Pattern.UNICODE_CASE | Pattern.CASE_INSENSITIVE;
static final int lowercaseRegexFlags = Pattern.UNICODE_CASE | Pattern.CASE_INSENSITIVE;
public static int collationAwareRegexFlags(final int collationId) {
return supportsLowercaseRegex(collationId) ? lowercaseRegexFlags : 0;
}
Expand Down
37 changes: 37 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,26 @@
"Error reading streaming state file of <fileToRead> does not exist. If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location."
]
},
"HDFS_STORE_PROVIDER_OUT_OF_MEMORY" : {
"message" : [
"Could not load HDFS state store with id <stateStoreId> because of an out of memory exception."
]
},
"INVALID_CHANGE_LOG_READER_VERSION" : {
"message" : [
"The change log reader version cannot be <version>."
]
},
"INVALID_CHANGE_LOG_WRITER_VERSION" : {
"message" : [
"The change log writer version cannot be <version>."
]
},
"ROCKSDB_STORE_PROVIDER_OUT_OF_MEMORY" : {
"message" : [
"Could not load RocksDB state store with id <stateStoreId> because of an out of memory exception."
]
},
"SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
"message" : [
"Partition id <snapshotPartitionId> not found for state of operator <operatorId> at <checkpointLocation>."
Expand Down Expand Up @@ -625,6 +645,17 @@
],
"sqlState" : "40000"
},
"CONFLICTING_PARTITION_COLUMN_NAMES" : {
"message" : [
"Conflicting partition column names detected:",
"<distinctPartColLists>",
"For partitioned table directories, data files should only live in leaf directories.",
"And directories at the same level should have the same partition column name.",
"Please check the following directories for unexpected files or inconsistent partition column names:",
"<suspiciousPaths>"
],
"sqlState" : "KD009"
},
"CONNECT" : {
"message" : [
"Generic Spark Connect error."
Expand Down Expand Up @@ -1941,6 +1972,12 @@
],
"sqlState" : "22003"
},
"INVALID_BOOLEAN_STATEMENT" : {
"message" : [
"Boolean statement is expected in the condition, but <invalidStatement> was found."
],
"sqlState" : "22546"
},
"INVALID_BOUNDARY" : {
"message" : [
"The boundary <boundary> is invalid: <invalidValue>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ abstract class AvroSuite
assert(uncompressSize > deflateSize)
assert(snappySize > deflateSize)
assert(snappySize > bzip2Size)
assert(bzip2Size > xzSize)
assert(xzSize > bzip2Size)
assert(uncompressSize > zstandardSize)
}
}
Expand Down
22 changes: 22 additions & 0 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@
</transformers>
</configuration>
</plugin>
<plugin>
<!--
Here we download Spark Connect cleint dependencies for REPL and copy
Spark Connect client to target's jars/connect-repl directory (at assembly/pom.xml).
Those jars will only be loaded when we run Spark Connect shell, see also SPARK-49198
-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>get-ammonite-jar</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/connect-repl</outputDirectory>
<includeScope>provided</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand Down
Loading

0 comments on commit 47485ff

Please sign in to comment.