From d481fa588f18776cfc333fefac0ccd0730acb0cd Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 30 May 2024 12:11:54 -0700 Subject: [PATCH] Create dataframe from list instead of RDD Signed-off-by: Peng Huo --- .../org/apache/spark/sql/FlintJobExecutor.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 665ec5a27..23abf8d6b 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -7,6 +7,8 @@ package org.apache.spark.sql import java.util.Locale +import scala.collection.JavaConverters._ + import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException} import com.amazonaws.services.s3.model.AmazonS3Exception import com.fasterxml.jackson.databind.ObjectMapper @@ -177,12 +179,16 @@ trait FlintJobExecutor { startTime: Long, timeProvider: TimeProvider, cleaner: Cleaner): DataFrame = { - // Create the schema dataframe - val schemaRows = result.schema.fields.map { field => - Row(field.name, field.dataType.typeName) - } + // collect schema rows as List + val schemaRows = result.schema.fields + .map { field => + Row(field.name, field.dataType.typeName) + } + .toList + .asJava + val resultSchema = spark.createDataFrame( - spark.sparkContext.parallelize(schemaRows), + schemaRows, StructType( Seq( StructField("column_name", StringType, nullable = false),