From b27877c86223b231806a38fd33eb9d3655606e59 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 22 May 2024 10:49:02 -0700 Subject: [PATCH] Bump Spark version Signed-off-by: Chen Dai --- build.sbt | 3 ++- .../scala/org/apache/spark/sql/flint/FlintWrite.scala | 2 ++ .../apache/spark/sql/flint/json/FlintJacksonParser.scala | 9 +++++---- .../spark/skipping/FlintSparkSkippingFileIndex.scala | 4 ++-- .../skipping/FlintSparkSkippingFileIndexSuite.scala | 5 +++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 6d6058e95..99fafc5bb 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import Dependencies._ lazy val scala212 = "2.12.14" -lazy val sparkVersion = "3.4.1" +lazy val sparkVersion = "3.5.1" lazy val opensearchVersion = "2.6.0" lazy val icebergVersion = "1.5.0" @@ -14,6 +14,7 @@ val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".") ThisBuild / organization := "org.opensearch" +// TODO: Bump to 0.4.1 ? ThisBuild / version := "0.4.0-SNAPSHOT" ThisBuild / scalaVersion := scala212 diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala index 25b4db940..8691de3d0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala @@ -48,4 +48,6 @@ case class FlintWrite( override def toBatch: BatchWrite = this override def toStreaming: StreamingWrite = this + + override def useCommitCoordinator(): Boolean = false } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala index 96e0b40fb..a9e9122fb 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala @@ -118,7 +118,8 @@ class FlintJacksonParser( array.toArray[InternalRow](schema) } case START_ARRAY => - throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError() + throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError( + parser.currentToken().asString()) } } @@ -537,7 +538,7 @@ class FlintJacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), () => None, e) + throw BadRecordException(() => recordLiteral(record), cause = e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -545,11 +546,11 @@ class FlintJacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), - partialResult = () => Some(row), + partialResults = () => Array(row), cause) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index bd7abcfb3..dc7875ccf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -10,7 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COL import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.functions.isnull import org.apache.spark.sql.types.StructType @@ -96,7 +96,7 @@ case class FlintSparkSkippingFileIndex( .toSet } - private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { + private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatusWithMetadata) = { selectedFiles.contains(f.getPath.toUri.toString) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index d2ef72158..4b707841c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -16,7 +16,7 @@ import org.apache.spark.FlintSuite import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} -import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -118,7 +118,8 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { partitions.map { case (partitionName, filePaths) => - val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) + val files = filePaths.map(path => + FileStatusWithMetadata(new FileStatus(0, false, 0, 0, 0, new Path(path)))) PartitionDirectory(InternalRow(Literal(partitionName)), files) }.toSeq }