Skip to content

Commit

Permalink
[SPARK-45686][INFRA][CORE][SQL][SS][CONNECT][MLLIB][DSTREAM][AVRO][ML…
Browse files Browse the repository at this point in the history
…][K8S][YARN][PYTHON][R][UI][GRAPHX][PROTOBUF][TESTS][EXAMPLES] Explicitly convert `Array` to `Seq` when function input is defined as `Seq` to avoid compilation warnings related to `class LowPriorityImplicits2 is deprecated`

### What changes were proposed in this pull request?
This is pr change to explicitly convert `Array` to `Seq` when function input is defined as `Seq `to avoid compilation warnings as like follwos:

```
[error] /Users/yangjie01/SourceCode/git/spark-mine-sbt/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala:57:31: method copyArrayToImmutableIndexedSeq in class LowPriorityImplicits2 is deprecated (since 2.13.0): implicit conversions from Array to immutable.IndexedSeq are implemented by copying; use `toIndexedSeq` explicitly if you want to copy, or use the more efficient non-copying ArraySeq.unsafeWrapArray
[error] Applicable -Wconf / nowarn filters for this fatal warning: msg=<part of the message>, cat=deprecation, site=org.apache.spark.ml.linalg.Vector.equals, origin=scala.LowPriorityImplicits2.copyArrayToImmutableIndexedSeq, version=2.13.0
[error]             Vectors.equals(s1.indices, s1.values, s2.indices, s2.values)
[error]                               ^
```

There are mainly three ways to fix it:
- `tools` and `mllib-local` module: Since the `tools` and `mllib-local` module does not import the `common-utils` module, `scala.collection.immutable.ArraySeq.unsafeWrapArray` is used directly.
- `examples` module: Since `ArrayImplicits` is an internal tool class in Spark, `scala.collection.immutable.ArraySeq.unsafeWrapArray` is used directly.
- Other modules: By importing `ArrayImplicits` and calling `toImmutableArraySeq`, the `Array` is wrapped into `immutable.ArraySeq`.

### Why are the changes needed?
Clean up deprecated Scala Api usage and using `Array.toImmutableArraySeq` equivalent to `immutable.ArraySeq.unsafeWrapArray` to avoid collection copy.

Why use `ArraySeq.unsafeWrapArray` instead of `toIndexedSeq`:

1. `ArraySeq.unsafeWrapArray` saves the overhead of collection copying compared to `toIndexedSeq`, it has less memory overhead and certain performance advantages. Moreover, `ArraySeq.unsafeWrapArray` is faster in scenarios such as
  - `Array.fill.toImmutableArraySeq` versus `IndexedSeq.fill`
  - `Array.apply(data).toImmutableArraySeq` versus `IndexedSeq.apply(data)`
  - `Array.emptyXXArray.toImmutableArraySeq` versus `IndexedSeq.empty`.

2. In Scala 2.12, when the function is defined as

```
def func(input: Seq[T]): R = {
	...
}
```

if an `Array` type data array is used as the function input, it will be implicitly converted by default through the `scala.Predef#genericArrayOps` function, the specific implementation is as follows:

```scala
  implicit def genericArrayOps[T](xs: Array[T]): ArrayOps[T] = (xs match {
    case x: Array[AnyRef]  => refArrayOps[AnyRef](x)
    case x: Array[Boolean] => booleanArrayOps(x)
    case x: Array[Byte]    => byteArrayOps(x)
    case x: Array[Char]    => charArrayOps(x)
    case x: Array[Double]  => doubleArrayOps(x)
    case x: Array[Float]   => floatArrayOps(x)
    case x: Array[Int]     => int(x    case x: Array[Long]    => longArrayOps(x)
    case x: Array[Short]   => shortArrayOps(x)
    case x: Array[Unit]    => unitArrayOps(x)
    case null              => null
  }).asInstanceOf[ArrayOps[T]]

  implicit def booleanArrayOps(xs: Array[Boolean]): ArrayOps.ofBoolean   = new ArrayOps.ofBoolean(xs)
  implicit def byteArrayOps(xs: Array[Byte]): ArrayOps.ofByte            = new ArrayOps.ofByte(xs)
  implicit def charArrayOps(xs: Array[Char]): ArrayOps.ofChar            = new ArrayOps.ofChar(xs)
  implicit def doubleArrayOps(xs: Array[Double]): ArrayOps.ofDouble      = new ArrayOps.ofDouble(xs)
  implicit def floatArrayOps(xs: Array[Float]): ArrayOps.ofFloat         = new ArrayOps.ofFloat(xs)
  implicit def intArrayOps(xs: Array[Int]): ArrayOps.ofInt               = new ArrayOps.ofInt(xs  implicit def longArrayOps(xs: Array[Long]): ArrayOps.ofLong            = new ArrayOps.ofLong(xs)
  implicit def refArrayOps[T <: AnyRef](xs: Array[T]): ArrayOps.ofRef[T] = new ArrayOps.ofRef[T](xs)
  implicit def shortArrayOps(xs: Array[Short]): ArrayOps.ofShort         = new ArrayOps.ofShort(xs)
  implicit def unitArrayOps(xs: Array[Unit]): ArrayOps.ofUnit            = new ArrayOps.ofUnit(xs)
```

This implicit conversion will wrap the input data into a `mutable.WrappedArray`, for example for Array[Int] type data, it will be wrapped into `mutable.WrappedArray.ofInt`:

```scala
  final class ofInt(override val repr: Array[Int]) extends AnyVal with ArrayOps[Int] with ArrayLike[Int, Array[Int]] {

    override protected[this] def thisCollection: WrappedArray[Int] = new WrappedArray.ofInt(repr)
    override protected[this] def toCollection(repr: Array[Int]): WrappedArray[Int] = new WrappedArray.ofInt(repr)
    override protected[this] def newBuilder = new ArrayBuilder.ofInt

    def length: Int = repr.length
    def apply(index: Int): Int = repr(index)
    def update(index: Int, elem: Int) { repr(index) = elem }
  }

  final class ofInt(val array: Array[Int]) extends WrappedArray[Int] with Serializable {
    def elemTag = ClassTag.Int
    def length: Int = array.length
    def apply(index: Int): Int = array(index)
    def update(index: Int, elem: Int) { array(index) = elem }
    override def hashCode = MurmurHash3.wrappedArrayHash(array)
    override def equals(that: Any) = that match {
      case that: ofInt => Arrays.equals(array, that.array)
      case _ => super.equals(that)
    }
  }
```

As we can see, in Scala 2.12, Array type input will be implicitly converted into a `mutable.WrappedArray`, and no collection copying is performed.

In Scala 2.13, although the default implicit type conversion will perform a defensive collection copy, but based on the facts that existed when Spark using Scala 2.12, we can assume that it is still safe to explicitly wrap Array type input into an `immutable.ArraySeq` without collection copying in Scala 2.13.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43670 from LuciferYang/SPARK-45686-2.

Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
LuciferYang authored and srowen committed Nov 11, 2023
1 parent 917947e commit 0a79199
Show file tree
Hide file tree
Showing 417 changed files with 1,709 additions and 1,069 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object MavenUtils extends Logging {
Expand Down Expand Up @@ -113,7 +114,7 @@ private[spark] object MavenUtils extends Logging {
s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
}.toImmutableArraySeq
}

/** Path of the local Maven cache. */
Expand Down Expand Up @@ -222,7 +223,7 @@ private[spark] object MavenUtils extends Logging {
}
cacheDirectory.getAbsolutePath + File.separator +
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
}
}.toImmutableArraySeq
}

/** Adds the given maven coordinates to Ivy's module descriptor. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SerializableConfiguration

case class AvroScan(
Expand Down Expand Up @@ -59,7 +60,7 @@ case class AvroScan(
readDataSchema,
readPartitionSchema,
parsedOptions,
pushedFilters)
pushedFilters.toImmutableArraySeq)
}

override def equals(obj: Any): Boolean = obj match {
Expand All @@ -71,6 +72,6 @@ case class AvroScan(
override def hashCode(): Int = super.hashCode()

override def getMetaData(): Map[String, String] = {
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters.toImmutableArraySeq))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.{EqualTo, Not}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ArrayImplicits._

class AvroCatalystDataConversionSuite extends SparkFunSuite
with SharedSparkSession
Expand Down Expand Up @@ -90,7 +91,8 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
// Spark byte and short both map to avro int
case b: Byte => b.toInt
case s: Short => s.toInt
case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult))
case row: GenericInternalRow =>
InternalRow.fromSeq(row.values.map(prepareExpectedResult).toImmutableArraySeq)
case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult))
case map: MapData =>
val keys = new GenericArrayData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.Assertions

import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.util.SparkStringUtils.sideBySide
import org.apache.spark.util.ArrayImplicits._

abstract class QueryTest extends RemoteSparkSession {

Expand All @@ -43,7 +44,11 @@ abstract class QueryTest extends RemoteSparkSession {
}

protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect())
checkAnswer(df, expectedAnswer.collect().toImmutableArraySeq)
}

protected def checkAnswer(df: => DataFrame, expectedAnswer: Array[Row]): Unit = {
checkAnswer(df, expectedAnswer.toImmutableArraySeq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryPolicy
import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.IntegrationTestUtils._
import org.apache.spark.util.ArrayImplicits._

/**
* An util class to start a local spark connect server in a different process for local E2E tests.
Expand Down Expand Up @@ -175,7 +176,7 @@ object SparkConnectServerUtils {
(fileName.startsWith("scalatest") || fileName.startsWith("scalactic"))
}
.map(e => Paths.get(e).toUri)
spark.client.artifactManager.addArtifacts(jars)
spark.client.artifactManager.addArtifacts(jars.toImmutableArraySeq)
}

def createSparkSession(): SparkSession = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.AddArtifactsResponse
import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
import org.apache.spark.util.{MavenUtils, SparkFileUtils, SparkThreadUtils}
import org.apache.spark.util.ArrayImplicits._

/**
* The Artifact Manager is responsible for handling and transferring artifacts from the local
Expand Down Expand Up @@ -392,7 +393,7 @@ object Artifact {

val exclusionsList: Seq[String] =
if (!StringUtils.isBlank(exclusions)) {
exclusions.split(",")
exclusions.split(",").toImmutableArraySeq
} else {
Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ class SparkConnectPlanner(

private def transformPythonTableFunction(fun: proto.PythonUDTF): SimplePythonFunction = {
SimplePythonFunction(
command = fun.getCommand.toByteArray,
command = fun.getCommand.toByteArray.toImmutableArraySeq,
// Empty environment variables
envVars = Maps.newHashMap(),
pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
Expand Down Expand Up @@ -1030,7 +1030,7 @@ class SparkConnectPlanner(

if (!rel.hasValues) {
Unpivot(
Some(ids.map(_.named)),
Some(ids.map(_.named).toImmutableArraySeq),
None,
None,
rel.getVariableColumnName,
Expand All @@ -1042,8 +1042,8 @@ class SparkConnectPlanner(
}

Unpivot(
Some(ids.map(_.named)),
Some(values.map(v => Seq(v.named))),
Some(ids.map(_.named).toImmutableArraySeq),
Some(values.map(v => Seq(v.named)).toImmutableArraySeq),
None,
rel.getVariableColumnName,
Seq(rel.getValueColumnName),
Expand Down Expand Up @@ -1624,7 +1624,7 @@ class SparkConnectPlanner(

private def transformPythonFunction(fun: proto.PythonUDF): SimplePythonFunction = {
SimplePythonFunction(
command = fun.getCommand.toByteArray,
command = fun.getCommand.toByteArray.toImmutableArraySeq,
// Empty environment variables
envVars = Maps.newHashMap(),
pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.grpc.netty.NettyServerBuilder

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -64,7 +65,8 @@ object SparkConnectInterceptorRegistry {
.map(_.trim)
.filter(_.nonEmpty)
.map(Utils.classForName[ServerInterceptor](_))
.map(createInstance(_))
.map(createInstance)
.toImmutableArraySeq
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener, StreamingForeachBatchHelper}
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ArrayImplicits._

class SparkConnectSessionHolderSuite extends SharedSparkSession {

Expand Down Expand Up @@ -160,7 +161,7 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"

SimplePythonFunction(
command = fcn(sparkPythonPath),
command = fcn(sparkPythonPath).toImmutableArraySeq,
envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
pythonExec = IntegratedUDFTestUtils.pythonExec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.MockedSystemClock.currentMockSystemTime
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
import org.apache.spark.util.ArrayImplicits._

/**
* A [[MicroBatchStream]] that reads data from Kafka.
Expand Down Expand Up @@ -131,7 +132,7 @@ private[kafka010] class KafkaMicroBatchStream(
}

val limits: Seq[ReadLimit] = readLimit match {
case rows: CompositeReadLimit => rows.getReadLimits
case rows: CompositeReadLimit => rows.getReadLimits.toImmutableArraySeq
case rows => Seq(rows)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.ArrayImplicits._

/**
* This class uses Kafka's own [[Admin]] API to read data offsets from Kafka.
Expand Down Expand Up @@ -488,7 +489,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
rangeCalculator.getRanges(ranges, getSortedExecutorList)
rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq)
}

private def partitionsAssignedToAdmin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}
import org.apache.spark.util.ArrayImplicits._

/**
* This class uses Kafka's own [[org.apache.kafka.clients.consumer.KafkaConsumer]] API to
Expand Down Expand Up @@ -535,7 +536,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
rangeCalculator.getRanges(ranges, getSortedExecutorList())
rangeCalculator.getRanges(ranges, getSortedExecutorList().toImmutableArraySeq)
}

private def partitionsAssignedToConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.types._
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.ArrayImplicits._

/**
* A [[Source]] that reads data from Kafka using the following design.
Expand Down Expand Up @@ -180,7 +181,7 @@ private[kafka010] class KafkaSource(
latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)

val limits: Seq[ReadLimit] = limit match {
case rows: CompositeReadLimit => rows.getReadLimits
case rows: CompositeReadLimit => rows.getReadLimits.toImmutableArraySeq
case rows => Seq(rows)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

/**
* The provider class for all Kafka readers and writers. It is designed such that it throws
Expand Down Expand Up @@ -204,7 +205,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
case (ASSIGN, value) =>
AssignStrategy(JsonUtils.partitions(value))
case (SUBSCRIBE, value) =>
SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty))
SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty).toImmutableArraySeq)
case (SUBSCRIBE_PATTERN, value) =>
SubscribePatternStrategy(value.trim())
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2410,7 +2410,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {

private def sendMessagesWithTimestamp(
topic: String,
msgs: Seq[String],
msgs: Array[String],
part: Int,
ts: Long): Unit = {
val records = msgs.map { msg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.ArrayImplicits._

abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with KafkaTest {
protected var testUtils: KafkaTestUtils = _
Expand Down Expand Up @@ -364,7 +365,7 @@ class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase {
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
val row = new SpecificInternalRow(fieldTypes)
val row = new SpecificInternalRow(fieldTypes.toImmutableArraySeq)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
iter.foreach(writeTask.write(_))
Expand Down Expand Up @@ -580,7 +581,7 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase {
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
val row = new SpecificInternalRow(fieldTypes)
val row = new SpecificInternalRow(fieldTypes.toImmutableArraySeq)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
writeTask.execute(iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils}
import org.apache.spark.util.ArrayImplicits._

/**
* This is a helper class for Kafka test suites. This has the functionality to set up
Expand Down Expand Up @@ -440,6 +441,10 @@ class KafkaTestUtils(
offsets
}

def sendMessages(msgs: Array[ProducerRecord[String, String]]): Seq[(String, RecordMetadata)] = {
sendMessages(msgs.toImmutableArraySeq)
}

def cleanupLogs(): Unit = {
server.logManager.cleanupLogs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ArrayImplicits._

/**
* A batch-oriented interface for consuming from Kafka.
Expand Down Expand Up @@ -135,7 +136,7 @@ private[spark] class KafkaRDD[K, V](
context.runJob(
this,
(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
it.take(parts(tc.partitionId())).toArray, parts.keys.toArray
it.take(parts(tc.partitionId())).toArray, parts.keys.toArray.toImmutableArraySeq
).flatten
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -287,7 +288,8 @@ private[kinesis] class KinesisReceiver[T](
* for next block. Internally, this is synchronized with `rememberAddedRange()`.
*/
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
blockIdToSeqNumRanges.put(blockId,
SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray.toImmutableArraySeq))
seqNumRangesInCurrentBlock.clear()
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
}
Expand Down
Loading

0 comments on commit 0a79199

Please sign in to comment.