Skip to content

Commit

Permalink
Bump Spark version
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 22, 2024
1 parent b5eb552 commit b27877c
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.4.1"
lazy val sparkVersion = "3.5.1"
lazy val opensearchVersion = "2.6.0"
lazy val icebergVersion = "1.5.0"

Expand All @@ -14,6 +14,7 @@ val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

// TODO: Bump to 0.4.1 ?
ThisBuild / version := "0.4.0-SNAPSHOT"

ThisBuild / scalaVersion := scala212
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ case class FlintWrite(
override def toBatch: BatchWrite = this

override def toStreaming: StreamingWrite = this

override def useCommitCoordinator(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class FlintJacksonParser(
array.toArray[InternalRow](schema)
}
case START_ARRAY =>
throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError()
throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError(
parser.currentToken().asString())
}
}

Expand Down Expand Up @@ -537,19 +538,19 @@ class FlintJacksonParser(
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
throw BadRecordException(() => recordLiteral(record), cause = e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResult = () => Some(row),
partialResults = () => Array(row),
cause)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COL

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.functions.isnull
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -96,7 +96,7 @@ case class FlintSparkSkippingFileIndex(
.toSet
}

private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = {
private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatusWithMetadata) = {
selectedFiles.contains(f.getPath.toUri.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.spark.FlintSuite
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate}
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -118,7 +118,8 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {

private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = {
partitions.map { case (partitionName, filePaths) =>
val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path)))
val files = filePaths.map(path =>
FileStatusWithMetadata(new FileStatus(0, false, 0, 0, 0, new Path(path))))
PartitionDirectory(InternalRow(Literal(partitionName)), files)
}.toSeq
}
Expand Down

0 comments on commit b27877c

Please sign in to comment.