From 20b10ff20d9874892d20a4d1d71c3b3e4fde61d5 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Mon, 24 Jul 2023 19:33:53 +0800 Subject: [PATCH 1/4] Add test cases for ORC writing according to options orc.compress and compression Signed-off-by: Chong Gao --- .../GpuFileScanPrunePartitionSuite.scala | 1 - .../spark/rapids/GpuReaderTypeSuites.scala | 1 - .../nvidia/spark/rapids/OrcQuerySuite.scala | 54 +++++++++++++++++++ .../rapids/SparkQueryCompareTestSuite.scala | 18 +++++-- 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala index f54b27b8fc7..41a52e01739 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids import org.apache.spark.SparkConf import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.FileUtils.withTempPath import org.apache.spark.sql.rapids.GpuFileSourceScanExec class GpuFileScanPrunePartitionSuite extends SparkQueryCompareTestSuite { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala index 31086038201..9627055116e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala @@ -20,7 +20,6 @@ import com.nvidia.spark.rapids.RapidsReaderType._ import com.nvidia.spark.rapids.shims.GpuBatchScanExec import org.apache.spark.SparkConf -import org.apache.spark.sql.FileUtils.withTempPath import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.rapids.{ExternalSource, GpuFileSourceScanExec} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala index 9e2c92b6a76..53180a73345 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala @@ -18,13 +18,21 @@ package com.nvidia.spark.rapids import java.io.File +import com.nvidia.spark.rapids.Arm.withResource +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileUtil.fullyDelete +import org.apache.hadoop.fs.Path +import org.apache.orc.OrcFile import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.rapids.{MyDenseVector, MyDenseVectorUDT} import org.apache.spark.sql.types._ +/** + * This corresponds to the Spark class: + * org.apache.spark.sql.execution.datasources.orc.OrcQueryTest + */ class OrcQuerySuite extends SparkQueryCompareTestSuite { private def getSchema: StructType = new StructType(Array( @@ -92,4 +100,50 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { ) { frame => frame } + + private def getOrcFilePostfix(compression: String): String = + if ("NONE" == compression) ".orc" else s".${compression.toLowerCase()}.orc" + + def checkCompressType(compression: Option[String], orcCompress: Option[String]): Unit = { + withGpuSparkSession { spark => + withTempPath { file => + var writer = spark.range(0, 10).write + writer = compression.map(t => writer.option("compression", t)).getOrElse(writer) + writer = orcCompress.map(t => writer.option("orc.compress", t)).getOrElse(writer) + writer.orc(file.getCanonicalPath) + + // first use compression, then orc.compress + val expectedType = compression.getOrElse(orcCompress.get) + val maybeOrcFile = file.listFiles() + .find(_.getName.endsWith(getOrcFilePostfix(expectedType))) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + withResource(OrcFile.createReader(orcFilePath, conf)) { reader => + assert(expectedType === reader.getCompressionKind.name) + } + } + } + } + + test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when compression is unset") { + // TODO GPU ORC writing supports ZLIB + // Respect `orc.compress` (i.e., OrcConf.COMPRESS). + Seq("SNAPPY", "ZSTD").foreach { orcCompress => + checkCompressType(None, Some(orcCompress)) + } + + // "compression" overwrite "orc.compress" + Seq(("SNAPPY", "ZSTD"), ("ZSTD", "SNAPPY")).foreach { case (compression, orcCompress) => + checkCompressType(Some(compression), Some(orcCompress)) + } + } + + test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { + // TODO GPU ORC writing supports ZLIB + Seq("SNAPPY", "ZSTD", "NONE").foreach { compression => + checkCompressType(Some(compression), None) + } + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 5c4138611ab..22f129f5250 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -15,16 +15,18 @@ */ package com.nvidia.spark.rapids -import java.io.File +import java.io.{File, IOException} import java.nio.file.Files import java.sql.{Date, Timestamp} -import java.util.{Locale, TimeZone} +import java.util.{Locale, TimeZone, UUID} -import org.scalatest.Assertion -import org.scalatest.funsuite.AnyFunSuite import scala.reflect.ClassTag import scala.util.{Failure, Try} +import org.apache.hadoop.fs.FileUtil +import org.scalatest.Assertion +import org.scalatest.funsuite.AnyFunSuite + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -2141,4 +2143,12 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite { false } } + + def withTempPath(func: File => Unit): Unit = { + val rootTmpDir = System.getProperty("java.io.tmpdir") + val dirFile = new File(rootTmpDir, "spark-test-" + UUID.randomUUID) + Files.createDirectories(dirFile.toPath) + if (!dirFile.delete()) throw new IOException(s"Delete $dirFile failed!") + try func(dirFile) finally FileUtil.fullyDelete(dirFile) + } } From 7f0fee2af5e4581f49a2ba699c2c811b43b73c0e Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 25 Jul 2023 13:04:10 +0800 Subject: [PATCH 2/4] Fix 321cdh and 330cdh compile error; Refactor --- .../nvidia/spark/rapids/OrcQuerySuite.scala | 56 +++++++++++++++---- .../rapids/SparkQueryCompareTestSuite.scala | 4 ++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala index 53180a73345..6cc00ad7a48 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala @@ -18,11 +18,12 @@ package com.nvidia.spark.rapids import java.io.File -import com.nvidia.spark.rapids.Arm.withResource +import scala.collection.mutable.ListBuffer + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileUtil.fullyDelete import org.apache.hadoop.fs.Path -import org.apache.orc.OrcFile +import org.apache.orc.{OrcFile, Reader} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -102,7 +103,11 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { } private def getOrcFilePostfix(compression: String): String = - if ("NONE" == compression) ".orc" else s".${compression.toLowerCase()}.orc" + if (Seq("NONE", "UNCOMPRESSED").contains(compression)) { + ".orc" + } else { + s".${compression.toLowerCase()}.orc" + } def checkCompressType(compression: Option[String], orcCompress: Option[String]): Unit = { withGpuSparkSession { spark => @@ -110,39 +115,68 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { var writer = spark.range(0, 10).write writer = compression.map(t => writer.option("compression", t)).getOrElse(writer) writer = orcCompress.map(t => writer.option("orc.compress", t)).getOrElse(writer) + // write ORC file on GPU writer.orc(file.getCanonicalPath) - // first use compression, then orc.compress - val expectedType = compression.getOrElse(orcCompress.get) + // expectedType: first use compression, then orc.compress + var expectedType = compression.getOrElse(orcCompress.get) + // ORC use NONE for UNCOMPRESSED + if (expectedType == "UNCOMPRESSED") expectedType = "NONE" val maybeOrcFile = file.listFiles() .find(_.getName.endsWith(getOrcFilePostfix(expectedType))) assert(maybeOrcFile.isDefined) + // check the compress type using ORC jar val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) - withResource(OrcFile.createReader(orcFilePath, conf)) { reader => + var reader: Reader = null + try { + reader = OrcFile.createReader(orcFilePath, conf) + // check assert(expectedType === reader.getCompressionKind.name) + } finally { + // close reader if needed + reader match { + case closeableReader: AutoCloseable => closeableReader.close() + // the reader is not a AutoCloseable for Spark CDH + // 321cdh uses orc-core-1.5.1.7.1.7.1000-141.jar + // 330cdh uses orc-core-1.5.1.7.1.8.0-801.jar + case _ => // do nothing for 321cdh 330cdh Sparks + } } } } } + private val supportedWriteCompressTypes = { + // GPU ORC writing does not support ZLIB, LZ4, refer to GpuOrcFileFormat + val supportedWriteCompressType = ListBuffer("UNCOMPRESSED", "NONE", "ZSTD", "SNAPPY") + // Cdh321, Cdh330 does not support ZSTD, refer to the Cdh Class: + // org.apache.spark.sql.execution.datasources.orc.OrcOptions + // Spark 31x do not support lz4, zstd + if (isCdh321 || isCdh330 || !VersionUtils.isSpark320OrLater) { + supportedWriteCompressType -= "ZSTD" + } + supportedWriteCompressType + } + test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when compression is unset") { - // TODO GPU ORC writing supports ZLIB // Respect `orc.compress` (i.e., OrcConf.COMPRESS). - Seq("SNAPPY", "ZSTD").foreach { orcCompress => + supportedWriteCompressTypes.foreach { orcCompress => checkCompressType(None, Some(orcCompress)) } + // make paris, e.g.: [("UNCOMPRESSED", "NONE"), ("NONE", "SNAPPY"), ("SNAPPY", "ZSTD") ... ] + val pairs = supportedWriteCompressTypes.sliding(2).toList.map(pair => (pair.head, pair.last)) + // "compression" overwrite "orc.compress" - Seq(("SNAPPY", "ZSTD"), ("ZSTD", "SNAPPY")).foreach { case (compression, orcCompress) => + pairs.foreach { case (compression, orcCompress) => checkCompressType(Some(compression), Some(orcCompress)) } } test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { - // TODO GPU ORC writing supports ZLIB - Seq("SNAPPY", "ZSTD", "NONE").foreach { compression => + supportedWriteCompressTypes.foreach { compression => checkCompressType(Some(compression), None) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 22f129f5250..68c6737715c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -2151,4 +2151,8 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite { if (!dirFile.delete()) throw new IOException(s"Delete $dirFile failed!") try func(dirFile) finally FileUtil.fullyDelete(dirFile) } + + def isCdh321: Boolean = VersionUtils.isCloudera && cmpSparkVersion(3, 2, 1) == 0 + + def isCdh330: Boolean = VersionUtils.isCloudera && cmpSparkVersion(3, 3, 0) == 0 } From cb42932d4f53584d2337c52179b61b28df81115a Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Thu, 27 Jul 2023 13:15:09 +0800 Subject: [PATCH 3/4] Fix compile error --- tests/pom.xml | 6 ++++++ .../test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/pom.xml b/tests/pom.xml index f54c924084e..96b0001cef7 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -350,6 +350,12 @@ ${spark.version} provided + + org.apache.orc + orc-core + ${spark.version} + provided + diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala index e0092f94b50..b9a71a7ca29 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala @@ -24,9 +24,9 @@ import com.nvidia.spark.rapids.Arm.withResourceIfAllowed import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileUtil.fullyDelete import org.apache.hadoop.fs.Path -import org.apache.orc.{OrcFile, Reader} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.orc.OrcFile +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.rapids.{MyDenseVector, MyDenseVectorUDT} import org.apache.spark.sql.types._ From a38b750ec4e549b34dacb7ab9be1cce3a626bd60 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 28 Jul 2023 10:04:37 +0800 Subject: [PATCH 4/4] Fix nits --- .../test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala index 3f040faa200..388398c1ad6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcQuerySuite.scala @@ -158,7 +158,7 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { } } - private def getOrcFilePostfix(compression: String): String = + private def getOrcFileSuffix(compression: String): String = if (Seq("NONE", "UNCOMPRESSED").contains(compression)) { ".orc" } else { @@ -179,7 +179,7 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { // ORC use NONE for UNCOMPRESSED if (expectedType == "UNCOMPRESSED") expectedType = "NONE" val maybeOrcFile = file.listFiles() - .find(_.getName.endsWith(getOrcFilePostfix(expectedType))) + .find(_.getName.endsWith(getOrcFileSuffix(expectedType))) assert(maybeOrcFile.isDefined) // check the compress type using ORC jar @@ -215,7 +215,7 @@ class OrcQuerySuite extends SparkQueryCompareTestSuite { checkCompressType(None, Some(orcCompress)) } - // make paris, e.g.: [("UNCOMPRESSED", "NONE"), ("NONE", "SNAPPY"), ("SNAPPY", "ZSTD") ... ] + // make pairs, e.g.: [("UNCOMPRESSED", "NONE"), ("NONE", "SNAPPY"), ("SNAPPY", "ZSTD") ... ] val pairs = supportedWriteCompressTypes.sliding(2).toList.map(pair => (pair.head, pair.last)) // "compression" overwrite "orc.compress"