Skip to content

Commit

Permalink
Merge pull request #239 from srdc/multiple-source-log
Browse files Browse the repository at this point in the history
Multiple source log
  • Loading branch information
dogukan10 authored Oct 11, 2024
2 parents f213650 + 98134b1 commit 89dd362
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

<!--Dependency versions-->
<scala-logging.version>3.9.5</scala-logging.version>
<onfhir.version>3.4-SNAPSHOT</onfhir.version>
<onfhir.version>3.3-SNAPSHOT</onfhir.version>
<onfhir-template-engine.version>1.1-SNAPSHOT</onfhir-template-engine.version>
<spark-on-fhir.version>1.0-SNAPSHOT</spark-on-fhir.version>
<json4s.version>3.7.0-M11</json4s.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package io.tofhir.engine.execution.processing

import io.tofhir.engine.model.{FhirMappingErrorCodes, FhirMappingJobExecution, FhirMappingResult}
import io.tofhir.engine.util.FileUtils.FileExtensions
import org.apache.hadoop.fs.FileUtil
import org.apache.spark.sql.functions.{col, from_json, schema_of_json}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

import java.io.File
import java.util
import io.onfhir.util.JsonFormatter._

/**
* This class persists [[FhirMappingResult]]s that produced some error during the mapping process.
Expand Down Expand Up @@ -46,33 +46,72 @@ object ErroneousRecordWriter {
}

/**
* Writes the dataset to the errorOutputDirectory. Directory structure:
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<random-generated-name-by-spark>.csv
* Writes the dataset to the error output directory based on the error type and job details.
* Each source (e.g., "mainSource") within the "source" field is extracted dynamically,
* and its data is written into a separate subdirectory under the output path. The directory structure is as follows:
*
* @param mappingJobExecution job execution to get output directory of data sources with errors
* @param dataset filtered dataset of data sources with errors to write to configured folder
* @param mappingTaskName to create directory for each mapping name within the job execution
* @param errorType one of invalid_input, mapping_error, invalid_resource
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<sourceName>\<random-generated-name-by-spark>.csv
*
* @param mappingJobExecution The job execution object, used to get the output directory of data sources with errors.
* @param dataset The filtered dataset of data sources with errors to be written to the configured folder.
* @param mappingTaskName The name of the mapping task, used to create a directory for each mapping within the job execution.
* @param errorType The type of error (e.g., invalid_input, mapping_error, invalid_resource).
*/
private def writeErroneousDataset(mappingJobExecution: FhirMappingJobExecution,
dataset: Dataset[FhirMappingResult],
mappingTaskName: String,
errorType: String): Unit = {
val outputPath = mappingJobExecution.getErrorOutputDirectory(mappingTaskName, errorType)
val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source.get)
val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source)

// extract each source name (e.g., "mainSource", "secondarySource") from the dataset's "source" field
val jsonColumns: Array[String] = dataset
.select("source")
.rdd
.flatMap(row => row.getAs[String]("source").parseJson.values.keys) // parse JSON and get all keys from the "source" map
.distinct() // remove duplicate source names
.collect()

dataset
.withColumn("jsonData", from_json(col("source"), schema))
.select("jsonData.*")
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(outputPath)
// for each source, create and write a separate CSV file
jsonColumns.foreach { sourceKey =>
// extract the source data for the given sourceKey and flatten the selected source field
val sourceDataset = dataset
.withColumn("jsonData", from_json(col("source"), schema))
.selectExpr(s"jsonData.$sourceKey.*")

// Remove all files except the CSV file (to remove .crc files)
val srcFiles = FileUtil.listFiles(new File(outputPath))
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
srcFiles.foreach(f => f.delete())
// write the extracted source data as a separate CSV file into the directory for that sourceKey
sourceDataset
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(s"$outputPath/$sourceKey")
}

// remove all files except the CSV file (to remove .crc files)
deleteNonCsvFiles(new File(outputPath))
}

/**
* Recursively deletes non-CSV files (like .crc files) from the directory.
*
* @param dir The directory to clean up by removing non-CSV files.
*/
private def deleteNonCsvFiles(dir: File): Unit = {
if (dir.exists() && dir.isDirectory) {
// get a list of files to delete, excluding CSV files
val filesToDelete = dir.listFiles()
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
// process each file in the current directory
filesToDelete.foreach(file => {
if (file.isFile) {
// delete the file if it's a regular file
file.delete()
} else if (file.isDirectory) {
// if it's a subdirectory, recursively clean it up
deleteNonCsvFiles(file)
}
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(jo),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationError
Expand Down Expand Up @@ -149,7 +149,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(jo),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationErrors.mkString("\n")
Expand Down Expand Up @@ -189,7 +189,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappedFhirResources = flattenedResources.map(r => MappedFhirResource(Some(r._1), Some(Serialization.write(r._2)), r._3)),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
executionId = executionId,
projectId = fhirMappingService.projectId,
))
Expand All @@ -214,7 +214,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
mappedResource = Some(Serialization.write(JArray(resources.toList))),
fhirInteraction = fhirInteraction,
executionId = executionId,
Expand All @@ -228,7 +228,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
mappedResource = Some(Serialization.write(r)),
fhirInteraction = fhirInteraction,
executionId = executionId,
Expand Down Expand Up @@ -261,7 +261,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = errorDescription,
Expand All @@ -276,7 +276,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = ExceptionUtil.extractExceptionMessages(e)
Expand All @@ -290,7 +290,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_TIMEOUT,
description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!"
Expand All @@ -304,7 +304,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM,
description = "Exception:" + oth.getMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class FhirMappingResult(
mappingExpr:Option[String] = None,
timestamp:Timestamp,
mappedResource:Option[String] = None,
source:Option[String] = None,
source:String,
error:Option[FhirMappingError] = None,
fhirInteraction:Option[FhirInteraction] = None,
executionId: Option[String] = None,
Expand All @@ -39,7 +39,7 @@ case class FhirMappingResult(
final val eventId:String = "MAPPING_RESULT"
override def toString: String = {
s"Mapping failure (${error.get.code}) for job '$jobId' and mappingTask '$mappingTaskName'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")} execution '${executionId.getOrElse("")}'!\n"+
s"\tSource: ${source.get}\n"+
s"\tSource: ${source}\n"+
s"\tError: ${error.get.description}" +
error.get.expression.map(e => s"\n\tExpression: $e").getOrElse("")
}
Expand All @@ -58,7 +58,7 @@ case class FhirMappingResult(
markerMap.put("executionId", executionId.getOrElse(""))
markerMap.put("mappingTaskName", mappingTaskName)
markerMap.put("mappingExpr", mappingExpr.orElse(null))
markerMap.put("source", source.get)
markerMap.put("source", source)
markerMap.put("errorCode", error.get.code)
markerMap.put("errorDesc", error.get.description)
markerMap.put("errorExpr", error.get.expression.getOrElse(""))
Expand Down
Loading

0 comments on commit 89dd362

Please sign in to comment.