From d086f700a2522ad9aa383ce74e5b89b786406da8 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Sun, 15 Sep 2024 20:38:03 -0700 Subject: [PATCH] Misc fix --- .../execution/datasources/v2/state/StateDataSource.scala | 2 +- .../datasources/v2/state/StatePartitionReader.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 74b5b9c07aa53..edc937881ae7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -381,7 +381,7 @@ object StateSourceOptions extends DataSourceOptions { Option(options.get(FLATTEN_COLLECTION_TYPES)) .map(_.toBoolean).getOrElse(true) } catch { - case _: Exception => + case _: IllegalArgumentException => throw StateDataSourceErrors.invalidOptionValue(FLATTEN_COLLECTION_TYPES, "Boolean value is expected") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala index 0ee07c052fc4d..05222de7ca36c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.state +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -147,7 +149,7 @@ abstract class StatePartitionReaderBase( /** * An implementation of [[StatePartitionReaderBase]] for the normal mode of State Data - * Source. It reads the the state at a particular batchId. + * Source. It reads the state at a particular batchId. */ class StatePartitionReader( storeConf: StateStoreConf, @@ -193,9 +195,9 @@ class StatePartitionReader( .map { pair => val key = pair.key val result = store.valuesIterator(key, stateVarName) - var unsafeRowArr: Seq[UnsafeRow] = Seq.empty + val unsafeRowArr = ArrayBuffer[UnsafeRow]() result.foreach { entry => - unsafeRowArr = unsafeRowArr :+ entry.copy() + unsafeRowArr += entry.copy() } // convert the list of values to array type val arrData = new GenericArrayData(unsafeRowArr.toArray)