Skip to content

Commit

Permalink
[SPARK-44784][CONNECT] Make SBT testing hermetic
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR makes a bunch of changes to connect testing for the scala client:
- We do not start the connect server with the `SPARK_DIST_CLASSPATH ` environment variable. This is set by the build system, but its value for SBT and Maven is different. For SBT it also contained the client code.
- We use dependency upload to add the dependencies needed for the tests. Currently this entails: the compiled test classes (class files), scalatest jars, and scalactic jars.
- The use of classfile sync unearthed an issue with stubbing and the `ExecutorClassLoader`. If they load classes in the same namespace then stubbing will generate stubs for classes that can be loaded by the `ExecutorClassLoader`. Since this is mostly a testing issue I decided to move the test code to a different namespace. We should definitely fix this later on.
- A bunch of tiny fixes.

### Why are the changes needed?
SBT testing for connect leaked client side code into the server. This is a problem because tests pass and we sign-off on features that do not work when well in a normal environment. Stubbing was an example of this. Maven did not have this problem and was therefore more correct.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
It are mostly tests.

### Was this patch authored or co-authored using generative AI tooling?
No. I write my own code thank you...

Closes #42591 from hvanhovell/investigate-stubbing.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
  • Loading branch information
hvanhovell authored and LuciferYang committed Aug 27, 2023
1 parent 23ce9c4 commit 9326615
Show file tree
Hide file tree
Showing 46 changed files with 404 additions and 315 deletions.
6 changes: 2 additions & 4 deletions connector/connect/bin/spark-connect-build
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,5 @@ SCALA_BINARY_VER=`grep "scala.binary.version" "${SPARK_HOME}/pom.xml" | head -n1
SCALA_VER=`grep "scala.version" "${SPARK_HOME}/pom.xml" | grep ${SCALA_BINARY_VER} | head -n1 | awk -F '[<>]' '{print $3}'`
SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"

# Build the jars needed for spark submit and spark connect
build/sbt "${SCALA_ARG}" -Phive -Pconnect package || exit 1
# Build the jars needed for spark connect JVM client
build/sbt "${SCALA_ARG}" "sql/package;connect-client-jvm/assembly" || exit 1
# Build the jars needed for spark submit and spark connect JVM client
build/sbt "${SCALA_ARG}" -Phive -Pconnect package "connect-client-jvm/package" || exit 1
2 changes: 1 addition & 1 deletion connector/connect/bin/spark-connect-scala-client
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"
SCBUILD="${SCBUILD:-1}"
if [ "$SCBUILD" -eq "1" ]; then
# Build the jars needed for spark connect JVM client
build/sbt "${SCALA_ARG}" "sql/package;connect-client-jvm/assembly" || exit 1
build/sbt "${SCALA_ARG}" "connect-client-jvm/package" || exit 1
fi

if [ -z "$SCCLASSPATH" ]; then
Expand Down
3 changes: 1 addition & 2 deletions connector/connect/bin/spark-connect-scala-client-classpath
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ SCALA_VER=`grep "scala.version" "${SPARK_HOME}/pom.xml" | grep ${SCALA_BINARY_VE
SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"

CONNECT_CLASSPATH="$(build/sbt "${SCALA_ARG}" -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
SQL_CLASSPATH="$(build/sbt "${SCALA_ARG}" -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)"

echo "$CONNECT_CLASSPATH:$CLASSPATH"
echo "$CONNECT_CLASSPATH"
5 changes: 0 additions & 5 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Use mima to perform the compatibility check -->
<dependency>
<groupId>com.typesafe</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client

/**
* Class used to test stubbing. This needs to be in the main source tree, because this is not
* synced with the connect server during tests.
*/
case class ToStub(value: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import static org.apache.spark.sql.Encoders.*;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.RowFactory.create;
import org.apache.spark.sql.connect.client.SparkConnectClient;
import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.test.SparkConnectServerUtils;
import org.apache.spark.sql.types.StructType;

/**
Expand All @@ -40,14 +39,7 @@ public class JavaEncoderSuite implements Serializable {

@BeforeClass
public static void setup() {
SparkConnectServerUtils.start();
spark = SparkSession
.builder()
.client(SparkConnectClient
.builder()
.port(SparkConnectServerUtils.port())
.build())
.create();
spark = SparkConnectServerUtils.createSparkSession();
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{File, FilenameFilter}
import org.apache.commons.io.FileUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.{RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Random
import org.scalatest.matchers.must.Matchers._

import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.RemoteSparkSession

class ClientDataFrameStatSuite extends RemoteSparkSession {
private def toLetter(i: Int): String = (i + 97).toChar.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto
import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.ConnectFunSuite

// Add sample tests.
// - sample fraction: simple.sample(0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils.port
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.test.SparkConnectServerUtils.port
import org.apache.spark.sql.types._

class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql

import scala.collection.JavaConverters._

import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{QueryTest, SQLHelper}
import org.apache.spark.sql.types.{StringType, StructType}

class DataFrameNaFunctionSuite extends QueryTest with SQLHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.protobuf.{functions => pbFn}
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types.{DataType, StructType}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import java.sql.Timestamp
import java.util.Arrays

import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.test.{QueryTest, SQLHelper}
import org.apache.spark.sql.types._

case class ClickEvent(id: String, timestamp: Timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.client.util.IntegrationTestUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.protobuf.{functions => pbFn}
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.SparkFileUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.client.arrow.{ArrowDeserializers, ArrowSerializer}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Test suite for SQL implicits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.SparkException
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.RemoteSparkSession
import org.apache.spark.util.SparkThreadUtils.awaitResult

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.control.NonFatal

import io.grpc.{CallOptions, Channel, ClientCall, ClientInterceptor, MethodDescriptor}

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Tests for non-dataframe related SparkSession operations.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.connect.client.ToStub
import org.apache.spark.sql.test.RemoteSparkSession

class StubbingTestSuite extends RemoteSparkSession {
private def eval[T](f: => T): T = f

test("capture of to-be stubbed class") {
val session = spark
import session.implicits._
val result = spark
.range(0, 10, 1, 1)
.map(n => n + 1)
.as[ToStub]
.head()
eval {
assert(result.value == 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client
package org.apache.spark.sql

import java.io.File
import java.nio.file.{Files, Paths}

import scala.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.connect.common.ProtoDataTypes
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
import org.apache.spark.sql.test.RemoteSparkSession

class UDFClassLoadingE2ESuite extends RemoteSparkSession {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.api.java.function._
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.functions.{col, struct, udf}
import org.apache.spark.sql.test.QueryTest
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -215,33 +215,31 @@ class UserDefinedFunctionE2ETestSuite extends QueryTest {
}

test("Dataset foreachPartition") {
val sum = new AtomicLong()
val func: Iterator[JLong] => Unit = f => {
val sum = new AtomicLong()
f.foreach(v => sum.addAndGet(v))
// The value should be 45
assert(sum.get() == -1)
throw new Exception("Success, processed records: " + sum.get())
}
val exception = intercept[Exception] {
spark.range(10).repartition(1).foreachPartition(func)
}
assert(exception.getMessage.contains("45 did not equal -1"))
assert(exception.getMessage.contains("Success, processed records: 45"))
}

test("Dataset foreachPartition - java") {
val sum = new AtomicLong()
val exception = intercept[Exception] {
spark
.range(10)
.range(11)
.repartition(1)
.foreachPartition(new ForeachPartitionFunction[JLong] {
override def call(t: JIterator[JLong]): Unit = {
t.asScala.foreach(v => sum.addAndGet(v))
// The value should be 45
assert(sum.get() == -1)
throw new Exception("Success, processed records: " + sum.get())
}
})
}
assert(exception.getMessage.contains("45 did not equal -1"))
assert(exception.getMessage.contains("Success, processed records: 55"))
}

test("Dataset foreach: change not visible to client") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.reflect.runtime.universe.typeTag

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.common.UdfPacket
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.SparkSerDeUtils

class UserDefinedFunctionSuite extends ConnectFunSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Properties
import org.apache.commons.io.output.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession}

class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto.AddArtifactsRequest
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.regex.Pattern
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.lib.MiMaLib

import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._
import org.apache.spark.sql.test.IntegrationTestUtils._

/**
* A tool for checking the binary compatibility of the connect client API against the spark SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.file.Paths

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.SparkFileUtils

class ClassFinderSuite extends ConnectFunSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.connect.client

import java.util.UUID

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Test suite for [[SparkConnectClient.Builder]] parsing and configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.ConnectFunSuite

class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._
import org.apache.spark.sql.catalyst.util.SparkIntervalUtils._
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.connect.client.arrow.FooEnum.FooEnum
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types.{ArrayType, DataType, DayTimeIntervalType, Decimal, DecimalType, IntegerType, Metadata, SQLUserDefinedType, StructType, UserDefinedType, YearMonthIntervalType}

/**
Expand Down
Loading

0 comments on commit 9326615

Please sign in to comment.