Skip to content

Commit

Permalink
updating tests tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolanrensen committed Mar 17, 2024
1 parent e234f40 commit 66a42ac
Show file tree
Hide file tree
Showing 5 changed files with 667 additions and 653 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ApiTest : ShouldSpec({
withSpark(props = mapOf("spark.sql.codegen.comments" to true)) {

should("Create Seqs") {
spark.createDataset(seqOf(1, 2, 3), encoder())
spark.createDataset(seqOf(1, 2, 3), kotlinEncoderFor())
.collectAsList() shouldBe listOf(1, 2, 3)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.spark.api.tuples.*
import org.jetbrains.kotlinx.spark.extensions.DemoCaseClass
import scala.*
import java.math.BigDecimal
import java.sql.Date
Expand Down Expand Up @@ -211,9 +210,9 @@ class EncodingTest : ShouldSpec({

should("handle Scala Case class datasets") {
val caseClasses = listOf(
DemoCaseClass(1, "1"),
DemoCaseClass(2, "2"),
DemoCaseClass(3, "3"),
tupleOf(1, "1"),
tupleOf(2, "2"),
tupleOf(3, "3"),
)
val dataset = caseClasses.toDS()
dataset.show()
Expand All @@ -222,9 +221,9 @@ class EncodingTest : ShouldSpec({

should("handle Scala Case class with data class datasets") {
val caseClasses = listOf(
DemoCaseClass(1, "1" to 1L),
DemoCaseClass(2, "2" to 2L),
DemoCaseClass(3, "3" to 3L),
tupleOf(1, "1" to 1L),
tupleOf(2, "2" to 2L),
tupleOf(3, "3" to 3L),
)
val dataset = caseClasses.toDS()
dataset.show()
Expand All @@ -233,9 +232,9 @@ class EncodingTest : ShouldSpec({

should("handle data class with Scala Case class datasets") {
val caseClasses = listOf(
1 to DemoCaseClass(1, "1"),
2 to DemoCaseClass(2, "2"),
3 to DemoCaseClass(3, "3"),
1 to tupleOf(1, "1"),
2 to tupleOf(2, "2"),
3 to tupleOf(3, "3"),
)
val dataset = caseClasses.toDS()
dataset.show()
Expand All @@ -244,9 +243,9 @@ class EncodingTest : ShouldSpec({

should("handle data class with Scala Case class & deeper datasets") {
val caseClasses = listOf(
1 to DemoCaseClass(1, "1" to DemoCaseClass(1, 1.0)),
2 to DemoCaseClass(2, "2" to DemoCaseClass(2, 2.0)),
3 to DemoCaseClass(3, "3" to DemoCaseClass(3, 3.0)),
1 to tupleOf(1, "1" to tupleOf(1, 1.0)),
2 to tupleOf(2, "2" to tupleOf(2, 2.0)),
3 to tupleOf(3, "3" to tupleOf(3, 3.0)),
)
val dataset = caseClasses.toDS()
dataset.show()
Expand Down Expand Up @@ -426,7 +425,7 @@ class EncodingTest : ShouldSpec({
}

should("Generate schema correctly with nullalble list and map") {
val schema = encoder<NullFieldAbleDataClass>().schema()
val schema = kotlinEncoderFor<NullFieldAbleDataClass>().schema()
schema.fields().forEach {
it.nullable() shouldBe true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,15 @@ import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.shouldBe
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.streaming.Checkpoint
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
import org.jetbrains.kotlinx.spark.api.tuples.X
import org.jetbrains.kotlinx.spark.api.tuples.component1
import org.jetbrains.kotlinx.spark.api.tuples.component2
import org.jetbrains.kotlinx.spark.api.tuples.t
import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions
import org.jetbrains.kotlinx.spark.extensions.`KSparkExtensions$`
import scala.Tuple2
import java.io.File
import java.io.Serializable
Expand Down Expand Up @@ -202,18 +199,36 @@ class StreamingTest : ShouldSpec({
})


private val scalaCompatVersion = `KSparkExtensions$`.`MODULE$`.scalaCompatVersion()
private val sparkVersion = `KSparkExtensions$`.`MODULE$`.sparkVersion()
private fun createTempDir() = Utils.createTempDir(
private val scalaCompatVersion = SCALA_COMPAT_VERSION
private val sparkVersion = SPARK_VERSION
private fun createTempDir() = File.createTempFile(
System.getProperty("java.io.tmpdir"),
"spark_${scalaCompatVersion}_${sparkVersion}"
).apply { deleteOnExit() }

private fun checkpointFile(checkpointDir: String, checkpointTime: Time): Path {
val klass = Class.forName("org.apache.spark.streaming.Checkpoint$")
val moduleField = klass.getField("MODULE$").also { it.isAccessible = true }
val module = moduleField.get(null)
val checkpointFileMethod = klass.getMethod("checkpointFile", String::class.java, Time::class.java)
.also { it.isAccessible = true }
return checkpointFileMethod.invoke(module, checkpointDir, checkpointTime) as Path
}

private fun getCheckpointFiles(checkpointDir: String, fs: scala.Option<FileSystem>): scala.collection.immutable.Seq<Path> {
val klass = Class.forName("org.apache.spark.streaming.Checkpoint$")
val moduleField = klass.getField("MODULE$").also { it.isAccessible = true }
val module = moduleField.get(null)
val getCheckpointFilesMethod = klass.getMethod("getCheckpointFiles", String::class.java, scala.Option::class.java)
.also { it.isAccessible = true }
return getCheckpointFilesMethod.invoke(module, checkpointDir, fs) as scala.collection.immutable.Seq<Path>
}

private fun createCorruptedCheckpoint(): String {
val checkpointDirectory = createTempDir().absolutePath
val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000))
val fakeCheckpointFile = checkpointFile(checkpointDirectory, Time(1000))
FileUtils.write(File(fakeCheckpointFile.toString()), "spark_corrupt_${scalaCompatVersion}_${sparkVersion}", StandardCharsets.UTF_8)
assert(Checkpoint.getCheckpointFiles(checkpointDirectory, (null as FileSystem?).toOption()).nonEmpty())
assert(getCheckpointFiles(checkpointDirectory, (null as FileSystem?).toOption()).nonEmpty())
return checkpointDirectory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TypeInferenceTest : ShouldSpec({
data class Test2<T>(val vala2: T, val para2: Pair<T, String>)
data class Test<T>(val vala: T, val tripl1: Triple<T, Test2<Long>, T>)

val struct = Struct.fromJson(schema(typeOf<Pair<String, Test<Int>>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<Pair<String, Test<Int>>>().schema().prettyJson())!!
should("contain correct typings") {
expect(struct.fields).notToEqualNull().toContain.inAnyOrder.only.entries(
hasField("first", "string"),
Expand Down Expand Up @@ -68,7 +68,7 @@ class TypeInferenceTest : ShouldSpec({
data class Test2<T>(val vala2: T, val para2: Pair<T, Single<Double>>)
data class Test<T>(val vala: T, val tripl1: Triple<T, Test2<Long>, T>)

val struct = Struct.fromJson(schema(typeOf<Pair<String, Test<Int>>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<Pair<String, Test<Int>>>().schema().prettyJson())!!
should("contain correct typings") {
expect(struct.fields).notToEqualNull().toContain.inAnyOrder.only.entries(
hasField("first", "string"),
Expand Down Expand Up @@ -99,7 +99,7 @@ class TypeInferenceTest : ShouldSpec({
context("org.jetbrains.spark.api.org.jetbrains.spark.api.schema without generics") {
data class Test(val a: String, val b: Int, val c: Double)

val struct = Struct.fromJson(schema(typeOf<Test>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<Test>().schema().prettyJson())!!
should("return correct types too") {
expect(struct.fields).notToEqualNull().toContain.inAnyOrder.only.entries(
hasField("a", "string"),
Expand All @@ -109,7 +109,7 @@ class TypeInferenceTest : ShouldSpec({
}
}
context("type with list of ints") {
val struct = Struct.fromJson(schema(typeOf<List<Int>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<List<Int>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -118,7 +118,7 @@ class TypeInferenceTest : ShouldSpec({
}
}
context("type with list of Pairs int to long") {
val struct = Struct.fromJson(schema(typeOf<List<Pair<Int, Long>>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<List<Pair<Int, Long>>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -134,7 +134,7 @@ class TypeInferenceTest : ShouldSpec({
context("type with list of generic data class with E generic name") {
data class Test<E>(val e: E)

val struct = Struct.fromJson(schema(typeOf<List<Test<String>>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<List<Test<String>>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -147,7 +147,7 @@ class TypeInferenceTest : ShouldSpec({
}
}
context("type with list of list of int") {
val struct = Struct.fromJson(schema(typeOf<List<List<Int>>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<List<List<Int>>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -158,7 +158,7 @@ class TypeInferenceTest : ShouldSpec({
}
}
context("Subtypes of list") {
val struct = Struct.fromJson(schema(typeOf<ArrayList<Int>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<ArrayList<Int>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -168,7 +168,7 @@ class TypeInferenceTest : ShouldSpec({
}
}
context("Subtypes of list with nullable values") {
val struct = Struct.fromJson(schema(typeOf<ArrayList<Int?>>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<ArrayList<Int?>>().schema().prettyJson())!!
should("return correct types too") {
expect(struct) {
isOfType("array")
Expand All @@ -180,7 +180,7 @@ class TypeInferenceTest : ShouldSpec({
context("data class with props in order lon → lat") {
data class Test(val lon: Double, val lat: Double)

val struct = Struct.fromJson(schema(typeOf<Test>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<Test>().schema().prettyJson())!!
should("Not change order of fields") {
expect(struct.fields).notToEqualNull().containsExactly(
hasField("lon", "double"),
Expand All @@ -191,7 +191,7 @@ class TypeInferenceTest : ShouldSpec({
context("data class with nullable list inside") {
data class Sample(val optionList: List<Int>?)

val struct = Struct.fromJson(schema(typeOf<Sample>()).prettyJson())!!
val struct = Struct.fromJson(kotlinEncoderFor<Sample>().schema().prettyJson())!!

should("show that list is nullable and element is not") {
expect(struct)
Expand All @@ -213,7 +213,7 @@ class TypeInferenceTest : ShouldSpec({
}

should("generate valid serializer schema") {
expect(encoder<Sample>().schema()) {
expect(kotlinEncoderFor<Sample>().schema()) {
this
.feature("data type", { this.fields()?.toList() }) {
this.notToEqualNull().toContain.inOrder.only.entry {
Expand Down
Loading

0 comments on commit 66a42ac

Please sign in to comment.