Skip to content

Commit

Permalink
Misc fix
Browse files Browse the repository at this point in the history
  • Loading branch information
anishshri-db committed Sep 16, 2024
1 parent a870be2 commit d086f70
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ object StateSourceOptions extends DataSourceOptions {
Option(options.get(FLATTEN_COLLECTION_TYPES))
.map(_.toBoolean).getOrElse(true)
} catch {
case _: Exception =>
case _: IllegalArgumentException =>
throw StateDataSourceErrors.invalidOptionValue(FLATTEN_COLLECTION_TYPES,
"Boolean value is expected")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.state

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -147,7 +149,7 @@ abstract class StatePartitionReaderBase(

/**
* An implementation of [[StatePartitionReaderBase]] for the normal mode of State Data
* Source. It reads the the state at a particular batchId.
* Source. It reads the state at a particular batchId.
*/
class StatePartitionReader(
storeConf: StateStoreConf,
Expand Down Expand Up @@ -193,9 +195,9 @@ class StatePartitionReader(
.map { pair =>
val key = pair.key
val result = store.valuesIterator(key, stateVarName)
var unsafeRowArr: Seq[UnsafeRow] = Seq.empty
val unsafeRowArr = ArrayBuffer[UnsafeRow]()
result.foreach { entry =>
unsafeRowArr = unsafeRowArr :+ entry.copy()
unsafeRowArr += entry.copy()
}
// convert the list of values to array type
val arrData = new GenericArrayData(unsafeRowArr.toArray)
Expand Down

0 comments on commit d086f70

Please sign in to comment.