Skip to content

Commit

Permalink
Create dataframe from list instead of RDD
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed May 30, 2024
1 parent a4f95c5 commit d481fa5
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.apache.spark.sql

import java.util.Locale

import scala.collection.JavaConverters._

import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException}
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -177,12 +179,16 @@ trait FlintJobExecutor {
startTime: Long,
timeProvider: TimeProvider,
cleaner: Cleaner): DataFrame = {
// Create the schema dataframe
val schemaRows = result.schema.fields.map { field =>
Row(field.name, field.dataType.typeName)
}
// collect schema rows as List
val schemaRows = result.schema.fields
.map { field =>
Row(field.name, field.dataType.typeName)
}
.toList
.asJava

val resultSchema = spark.createDataFrame(
spark.sparkContext.parallelize(schemaRows),
schemaRows,
StructType(
Seq(
StructField("column_name", StringType, nullable = false),
Expand Down

0 comments on commit d481fa5

Please sign in to comment.