Skip to content

Commit

Permalink
⚡ Remove limit parameter from readers and improve performance for tes…
Browse files Browse the repository at this point in the history
…ting service.
  • Loading branch information
sinaci committed Oct 10, 2024
1 parent 71234b7 commit 53f07e4
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,29 @@ import java.time.LocalDateTime
/**
* Base data source reader
*/
abstract class BaseDataSourceReader[T <: MappingSourceBinding, S<:MappingJobSourceSettings] {
abstract class BaseDataSourceReader[T <: MappingSourceBinding, S <: MappingJobSourceSettings] {

/**
* Read the source data for the given task
* @param mappingSourceBinding Configuration information for the mapping source
* @param mappingJobSourceSettings Common settings for the source system
* @param schema Schema for the source data
* @param timeRange Time range for the data to read if given
* @param limit Limit the number of rows to read
* @param jobId The identifier of mapping job which executes the mapping
*
* @param mappingSourceBinding Configuration information for the mapping source
* @param mappingJobSourceSettings Common settings for the source system
* @param schema Schema for the source data
* @param timeRange Time range for the data to read if given
* @param jobId The identifier of mapping job which executes the mapping
* @return
*/
def read(mappingSourceBinding: T, mappingJobSourceSettings:S, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)] = Option.empty, limit: Option[Int], jobId: Option[String]): DataFrame
def read(mappingSourceBinding: T, mappingJobSourceSettings: S, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], jobId: Option[String]): DataFrame

/**
* Whether this reader needs a data type validation for columns after reading the source
*/
val needTypeValidation:Boolean = false
val needTypeValidation: Boolean = false

/**
* Whether this reader needs cardinality validation for columns after reading the source
*/
val needCardinalityValidation:Boolean = true
val needCardinalityValidation: Boolean = true

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ class FhirServerDataSourceReader(spark: SparkSession) extends BaseDataSourceRead
/**
* Reads data from the specified FHIR server source.
*
* @param mappingSourceBinding Configuration information for the mapping source.
* @param mappingSourceBinding Configuration information for the mapping source.
* @param mappingJobSourceSettings Source settings of the mapping job for the FHIR server.
* @param schema Optional schema for the source data.
* @param timeRange Optional time range to filter the data.
* @param limit Optional limit on the number of rows to read.
* @param jobId Optional identifier of the mapping job executing the read operation.
* @param schema Optional schema for the source data.
* @param timeRange Optional time range to filter the data.
* @param jobId Optional identifier of the mapping job executing the read operation.
* @return A DataFrame containing the source data from the FHIR server.
* @throws IllegalArgumentException If the path is not a directory for streaming jobs.
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSourceBinding: FhirServerSource, mappingJobSourceSettings: FhirServerSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
override def read(mappingSourceBinding: FhirServerSource, mappingJobSourceSettings: FhirServerSourceSettings, schema: Option[StructType],
timeRange: Option[(LocalDateTime, LocalDateTime)] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// extract Spark option for the authentication from the given source settings
val authenticationOptions = extractAuthenticationOptions(mappingJobSourceSettings)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@ import scala.collection.mutable
class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[FileSystemSource, FileSystemSourceSettings] {

private val logger: Logger = Logger(this.getClass)

/**
* Read the source data
* Read the source data from file system
*
* @param mappingSourceBinding Configuration information for the mapping source
* @param mappingJobSourceSettings Common settings for the source system
* @param schema Optional schema for the source
* @param timeRange Time range for the data to read if given
* @param limit Limit the number of rows to read
* @param jobId The identifier of mapping job which executes the mapping
* @param mappingSourceBinding Configuration information for the mapping source
* @param mappingJobSourceSettings Common settings for the source system
* @param schema Optional schema for the source
* @param timeRange Time range for the data to read if given
* @param jobId The identifier of mapping job which executes the mapping
* @return
* @throws IllegalArgumentException If the path is not a directory for streaming jobs.
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSourceBinding: FileSystemSource, mappingJobSourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
override def read(mappingSourceBinding: FileSystemSource, mappingJobSourceSettings: FileSystemSourceSettings, schema: Option[StructType],
timeRange: Option[(LocalDateTime, LocalDateTime)] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
// get the content type for the file
val contentType = mappingSourceBinding.contentType
// check whether it is a zip file
val isZipFile = mappingSourceBinding.path.endsWith(".zip");
val isZipFile = mappingSourceBinding.path.endsWith(".zip")
// determine the final path
// if it is a Hadoop path (starts with "hdfs://"), construct the URI directly without adding the context path
val finalPath = if (mappingJobSourceSettings.dataFolderPath.startsWith("hdfs://")) {
Expand All @@ -46,97 +47,95 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
FileUtils.getPath(mappingJobSourceSettings.dataFolderPath, mappingSourceBinding.path).toAbsolutePath.toString
}
// validate whether the provided path is a directory when streaming is enabled in the source settings
if(mappingJobSourceSettings.asStream && !new File(finalPath).isDirectory){
if (mappingJobSourceSettings.asStream && !new File(finalPath).isDirectory) {
throw new IllegalArgumentException(s"$finalPath is not a directory. For streaming job, you should provide a directory.")
}

val isDistinct = mappingSourceBinding.options.get("distinct").contains("true")

// keeps the names of processed files by Spark
val processedFiles: mutable.HashSet[String] =mutable.HashSet.empty
val processedFiles: mutable.HashSet[String] = mutable.HashSet.empty
//Based on source type
val resultDf = contentType match {
case SourceContentTypes.CSV | SourceContentTypes.TSV =>
val updatedOptions = contentType match {
case SourceContentTypes.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
mappingSourceBinding.options +
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"))
case SourceContentTypes.CSV => mappingSourceBinding.options
}
case SourceContentTypes.CSV | SourceContentTypes.TSV =>
val updatedOptions = contentType match {
case SourceContentTypes.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
mappingSourceBinding.options +
("sep" -> mappingSourceBinding.options.getOrElse("sep", "\\t"))
case SourceContentTypes.CSV => mappingSourceBinding.options
}

//Options that we infer for csv
val inferSchema = schema.isEmpty || mappingSourceBinding.preprocessSql.isDefined
val csvSchema = if (mappingSourceBinding.preprocessSql.isDefined) None else schema
//val enforceSchema = schema.isDefined
val includeHeader = mappingSourceBinding.options.get("header").forall(_ == "true")
//Other options except header, inferSchema and enforceSchema
val otherOptions = updatedOptions.filterNot(o => o._1 == "header" || o._1 == "inferSchema" || o._1 == "enforceSchema")
if (mappingJobSourceSettings.asStream) {
spark.readStream
.option("enforceSchema", false) //Enforce schema should be false (See https://spark.apache.org/docs/latest/sql-data-sources-csv.html)
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(finalPath)
// add a dummy column called 'filename' using a udf function to print a log when the data reading is
// started for a file.
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
} else if (isZipFile) {
val unzippedFileContents = SparkUtil.readZip(finalPath, spark)
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(unzippedFileContents)
} else
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(finalPath)
// assume that each line in the txt files contains a separate JSON object.
case SourceContentTypes.JSON | SourceContentTypes.NDJSON =>
if (mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
else if (isZipFile) {
val unzippedFileContents = SparkUtil.readZip(finalPath, spark)
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(unzippedFileContents)
}
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
case SourceContentTypes.PARQUET =>
if (mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
case _ => throw new NotImplementedError()
}

if (isDistinct) resultDf.distinct() else resultDf

//Options that we infer for csv
val inferSchema = schema.isEmpty || mappingSourceBinding.preprocessSql.isDefined
val csvSchema = if(mappingSourceBinding.preprocessSql.isDefined) None else schema
//val enforceSchema = schema.isDefined
val includeHeader = mappingSourceBinding.options.get("header").forall(_ == "true")
//Other options except header, inferSchema and enforceSchema
val otherOptions = updatedOptions.filterNot(o => o._1 == "header" || o._1 == "inferSchema" || o._1 == "enforceSchema")
if(mappingJobSourceSettings.asStream)
spark.readStream
.option("enforceSchema", false) //Enforce schema should be false (See https://spark.apache.org/docs/latest/sql-data-sources-csv.html)
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(finalPath)
// add a dummy column called 'filename' using a udf function to print a log when the data reading is
// started for a file.
.withColumn("filename",logStartOfDataReading(processedFiles,logger = logger,jobId = jobId)(input_file_name))
else if(isZipFile) {
import spark.implicits._
val unzippedFile = SparkUtil.readZip(finalPath, spark.sparkContext);
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(unzippedFile.toDS())
} else
spark.read
.option("enforceSchema", false) //Enforce schema should be false
.option("header", includeHeader)
.option("inferSchema", inferSchema)
.options(otherOptions)
.schema(csvSchema.orNull)
.csv(finalPath)
// assume that each line in the txt files contains a separate JSON object.
case SourceContentTypes.JSON | SourceContentTypes.NDJSON =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
else if(isZipFile){
import spark.implicits._
val unzippedFile = SparkUtil.readZip(finalPath, spark.sparkContext);
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(unzippedFile.toDS())
}
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).json(finalPath)
case SourceContentTypes.PARQUET =>
if(mappingJobSourceSettings.asStream)
spark.readStream.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
// add a dummy column called 'filename' to print a log when the data reading is started for a file
.withColumn("filename", logStartOfDataReading(processedFiles, logger = logger, jobId = jobId)(input_file_name))
else
spark.read.options(mappingSourceBinding.options).schema(schema.orNull).parquet(finalPath)
case _ => throw new NotImplementedError()
}
if(isDistinct)
resultDf.distinct()
else
resultDf
}

/**
* A user-defined function i.e. udf to print a log when data reading is started for a file. udf takes the
* name of input file being read and returns it after logging a message to indicate that data reading is started and
* it may take a while. It makes use of the given processedFiles set to decide whether to print a log. If it does not
* contain the file name i.e. Spark just started to process it, the log is printed.
*
* @param processedFiles The set of file names processed by Spark
* @param logger Logger instance
* @param jobId The identifier of mapping job which executes the mapping
* @param logger Logger instance
* @param jobId The identifier of mapping job which executes the mapping
* @return a user-defined function to print a log when data reading is started for a file
* */
private def logStartOfDataReading(processedFiles: mutable.HashSet[String], logger: Logger, jobId: Option[String]) = udf((fileName: String) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import javax.ws.rs.InternalServerErrorException
class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaSource, KafkaSourceSettings] {

/**
* Read the source data for the given task
* Read the source data for the given task from Kafka
*
* @param mappingSourceBinding Configuration information for mapping source
* @param schema Schema for the source
* @param limit Limit the number of rows to read
* @param jobId The identifier of mapping job which executes the mapping
* @param mappingSourceBinding Configuration information for the mapping source
* @param mappingJobSourceSettings Common settings for the source system
* @param schema Schema for the source data
* @param timeRange Time range for the data to read if given
* @param jobId The identifier of mapping job which executes the mapping
* @return
*/
override def read(mappingSourceBinding: KafkaSource, mappingJobSourceSettings: KafkaSourceSettings, schema: Option[StructType] = Option.empty, timeRange: Option[(LocalDateTime, LocalDateTime)] = Option.empty, limit: Option[Int] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
override def read(mappingSourceBinding: KafkaSource, mappingJobSourceSettings: KafkaSourceSettings, schema: Option[StructType] = Option.empty,
timeRange: Option[(LocalDateTime, LocalDateTime)] = Option.empty, jobId: Option[String] = Option.empty): DataFrame = {
import spark.implicits._

if (schema.isEmpty) {
Expand Down Expand Up @@ -108,7 +110,7 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS
// try to extract as string
case None => field._2.extractOpt[String] match {
// matches JString("0") or matches JString("1")
case Some(value) if value.contentEquals("0") || value.contentEquals("1") => JBool(if(value.contentEquals("0")) false else true)
case Some(value) if value.contentEquals("0") || value.contentEquals("1") => JBool(if (value.contentEquals("0")) false else true)
// matches JString(v)
case Some(value) if value.nonEmpty => JBool(value.toBoolean)
// matches JString()
Expand All @@ -125,7 +127,7 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS
}).toJson
})

val df = spark
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", mappingJobSourceSettings.bootstrapServers)
Expand All @@ -138,7 +140,5 @@ class KafkaSourceReader(spark: SparkSession) extends BaseDataSourceReader[KafkaS
.withColumn("value", processDataUDF(col("value"))) // replace 'value' column with the processed data
.select(from_json($"value", schema.get).as("record"))
.select("record.*")
df

}
}
Loading

0 comments on commit 53f07e4

Please sign in to comment.