Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Refactor the repository interfaces and implementations to d… #257

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.tofhir.common.env

object EnvironmentVariable extends Enumeration {
type EnvironmentVariable = Value
final val FHIR_REPO_URL = Value("FHIR_REPO_URL")
final val DATA_FOLDER_PATH= Value("DATA_FOLDER_PATH")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.tofhir.common.env

object EnvironmentVariableResolver {

def replaceEnvironmentVariables(fileContent: String): String = {
var returningContent = fileContent;
// val regex = """\$\{(.*?)\}""".r
EnvironmentVariable.values.foreach { e =>
val regex = "\\$\\{" + e.toString + "\\}"
if (sys.env.contains(e.toString)) returningContent = returningContent.replaceAll(regex, sys.env(e.toString))
}
returningContent
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.tofhir.engine.repository.mapping
package io.tofhir.common.model

/**
* A cached repository for the Mappings
* A cached repository.
*/
trait IFhirMappingCachedRepository extends IFhirMappingRepository {
trait ICachedRepository {
/**
* Invalidate the internal cache and refresh the cache with the FhirMappings directly from their source
sinaci marked this conversation as resolved.
Show resolved Hide resolved
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import io.onfhir.path._
import io.tofhir.engine.config.{ToFhirConfig, ToFhirEngineConfig}
import io.tofhir.engine.execution.RunningJobRegistry
import io.tofhir.engine.execution.processing.FileStreamInputArchiver
import io.tofhir.engine.mapping._
import io.tofhir.engine.mapping.context.{IMappingContextLoader, MappingContextLoader}
import io.tofhir.engine.mapping.schema.{IFhirSchemaLoader, SchemaFolderLoader}
import io.tofhir.engine.model.exception.EngineInitializationException
import io.tofhir.engine.repository.mapping.{FhirMappingFolderRepository, IFhirMappingCachedRepository}
import io.tofhir.engine.repository.mapping.{FhirMappingFolderRepository, IFhirMappingRepository}
import io.tofhir.engine.util.FileUtils
import org.apache.spark.sql.SparkSession

/**
* <p>tofhir Engine for executing mapping jobs and tasks.</p>
* <p>toFHIR Engine for executing mapping jobs and tasks.</p>
* <p>During initialization, the engine prioritizes the mapping and schema repositories provided as a constructor parameter.
* If they are not provided as constructor parameters, they are initialized as folder repository based on the folder paths set in the engine configurations.
* </p>
Expand All @@ -22,7 +21,7 @@ import org.apache.spark.sql.SparkSession
* @param schemaRepository Already instantiated schema repository that maintains a dynamically-updated data structure based on the operations on the schemas
* @param functionLibraryFactories External function libraries containing function to be used within FHIRPath expressions
*/
class ToFhirEngine(mappingRepository: Option[IFhirMappingCachedRepository] = None, schemaRepository: Option[IFhirSchemaLoader] = None, functionLibraryFactories: Map[String, IFhirPathFunctionLibraryFactory] = Map.empty) {
class ToFhirEngine(mappingRepository: Option[IFhirMappingRepository] = None, schemaRepository: Option[IFhirSchemaLoader] = None, functionLibraryFactories: Map[String, IFhirPathFunctionLibraryFactory] = Map.empty) {
// Validate that both mapping and schema repositories are empty or non-empty
if (mappingRepository.nonEmpty && schemaRepository.isEmpty || mappingRepository.isEmpty && schemaRepository.nonEmpty) {
throw EngineInitializationException("Mapping and schema repositories should both empty or non-empty")
Expand All @@ -33,7 +32,7 @@ class ToFhirEngine(mappingRepository: Option[IFhirMappingCachedRepository] = Non
val sparkSession: SparkSession = ToFhirConfig.sparkSession

//Repository for mapping definitions
val mappingRepo: IFhirMappingCachedRepository = mappingRepository.getOrElse(new FhirMappingFolderRepository(FileUtils.getPath(engineConfig.mappingRepositoryFolderPath).toUri))
val mappingRepo: IFhirMappingRepository = mappingRepository.getOrElse(new FhirMappingFolderRepository(FileUtils.getPath(engineConfig.mappingRepositoryFolderPath).toUri))

//Context loader
val contextLoader: IMappingContextLoader = new MappingContextLoader
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package io.tofhir.engine.cli

import io.tofhir.engine.Execution.actorSystem.dispatcher
import io.tofhir.engine.ToFhirEngine
import io.tofhir.engine.cli.command.{CommandExecutionContext, CommandFactory, Load}
import io.tofhir.engine.mapping.job.{FhirMappingJobManager, MappingJobScheduler}
import io.tofhir.engine.model.FhirMappingJobExecution
import io.tofhir.engine.util.FhirMappingJobFormatter
import io.tofhir.engine.{ToFhirEngine, cli}
import it.sauronsoftware.cron4j.Scheduler
import org.json4s.MappingException

import java.io.FileNotFoundException
import java.net.URI
import java.nio.file.Paths
import java.util.Scanner
import scala.annotation.tailrec
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Try

object CommandLineInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package io.tofhir.engine.cli.command
import io.tofhir.engine.mapping.job.FhirMappingJobManager
import io.tofhir.engine.model.{FhirMappingJob, FhirMappingJobExecution}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try
import io.tofhir.engine.Execution.actorSystem.dispatcher

class Run extends Command {
override def execute(args: Seq[String], context: CommandExecutionContext): CommandExecutionContext = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.tofhir.engine.model._
import io.tofhir.engine.model.exception.FhirMappingException
import org.json4s.JsonAST.{JArray, JNull, JObject, JValue}

import scala.concurrent.ExecutionContext.Implicits.global
import io.tofhir.engine.Execution.actorSystem.dispatcher
import scala.concurrent.Future

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.tofhir.engine.util.{CsvUtil, FileUtils}

import java.io.File
import java.nio.file.Paths
import scala.concurrent.ExecutionContext.Implicits.global
import io.tofhir.engine.Execution.actorSystem.dispatcher
import scala.concurrent.Future

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.json4s.{JArray, JBool, JObject, JString}

import java.io.File
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext.Implicits.global
import io.tofhir.engine.Execution.actorSystem.dispatcher
import scala.concurrent.Future
import scala.concurrent.duration.Duration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.io.Source
*
* @param folderUri Path to the folder
*/
class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingCachedRepository {
class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingRepository {
private val logger: Logger = Logger(this.getClass)

private var fhirMappings: Map[String, FhirMapping] = loadMappings(folderUri)
Expand All @@ -39,20 +39,20 @@ class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingCachedRepo
case e: Throwable => throw FhirMappingException(s"Given folder for the mapping repository is not valid.", e)
}
files.map { f =>
val source = Source.fromFile(f, StandardCharsets.UTF_8.name()) // read the JSON file
val fileContent = try source.mkString finally source.close()
val fhirMapping = try {
JsonMethods.parse(fileContent).removeField { // Remove any fields starting with @ from the JSON.
case JField(fieldName, _) if fieldName.startsWith("@") => true
case _ => false
}.extractOpt[FhirMapping]
} catch {
case e: Exception =>
logger.error(s"Cannot parse the mapping file ${f.getAbsolutePath}.")
Option.empty[FhirMapping]
}
fhirMapping -> f
}.filter(_._1.nonEmpty) // Remove the elements from the list if they are not valid FhirMapping JSONs
val source = Source.fromFile(f, StandardCharsets.UTF_8.name()) // read the JSON file
val fileContent = try source.mkString finally source.close()
val fhirMapping = try {
JsonMethods.parse(fileContent).removeField { // Remove any fields starting with @ from the JSON.
case JField(fieldName, _) if fieldName.startsWith("@") => true
case _ => false
}.extractOpt[FhirMapping]
} catch {
case e: Exception =>
logger.error(s"Cannot parse the mapping file ${f.getAbsolutePath}.")
Option.empty[FhirMapping]
}
fhirMapping -> f
}.filter(_._1.nonEmpty) // Remove the elements from the list if they are not valid FhirMapping JSONs
.map { case (fm, file) => fm.get -> file } // Get rid of the Option
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.tofhir.engine.repository.mapping

import io.tofhir.common.model.ICachedRepository
import io.tofhir.engine.model.FhirMapping

trait IFhirMappingRepository {
trait IFhirMappingRepository extends ICachedRepository {
/**
* Return the Fhir mapping definition by given url
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.fasterxml.jackson.dataformat.csv.{CsvMapper, CsvSchema}
import io.tofhir.engine.Execution.actorSystem

import java.io.{File, FileInputStream, InputStreamReader}
import scala.concurrent.ExecutionContext.Implicits.global
import io.tofhir.engine.Execution.actorSystem.dispatcher
import scala.concurrent.Future
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.javaapi.CollectionConverters
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.tofhir.engine.util

import io.onfhir.client.model.{BasicAuthenticationSettings, BearerTokenAuthorizationSettings, FixedTokenAuthenticationSettings}
import io.tofhir.common.env.EnvironmentVariableResolver
import io.tofhir.engine.model.{FhirMappingJob, FhirMappingTask, FhirRepositorySinkSettings, FhirServerSource, FhirServerSourceSettings, FileSystemSinkSettings, FileSystemSource, FileSystemSourceSettings, KafkaSource, KafkaSourceSettings, LocalFhirTerminologyServiceSettings, SQLSchedulingSettings, SchedulingSettings, SqlSource, SqlSourceSettings}
import org.json4s.{Formats, MappingException, ShortTypeHints}
import org.json4s.jackson.Serialization
Expand Down Expand Up @@ -59,7 +60,7 @@ object FhirMappingJobFormatter {
*/
def readMappingJobFromFile(filePath: String): FhirMappingJob = {
val source = Source.fromFile(filePath, StandardCharsets.UTF_8.name())
val fileContent = try replaceEnvironmentVariables(source.mkString) finally source.close()
val fileContent = try EnvironmentVariableResolver.replaceEnvironmentVariables(source.mkString) finally source.close()
val mappingJob = org.json4s.jackson.JsonMethods.parse(fileContent).extract[FhirMappingJob]
// check there are no duplicate name on mappingTasks of the job
val duplicateMappingTasks = FhirMappingJobFormatter.findDuplicateMappingTaskNames(mappingJob.mappings)
Expand All @@ -69,22 +70,6 @@ object FhirMappingJobFormatter {
mappingJob
}

private def replaceEnvironmentVariables(fileContent: String): String = {
var returningContent = fileContent;
// val regex = """\$\{(.*?)\}""".r
EnvironmentVariable.values.foreach { e =>
val regex = "\\$\\{" + e.toString + "\\}"
if (sys.env.contains(e.toString)) returningContent = returningContent.replaceAll(regex, sys.env(e.toString))
}
returningContent
}

object EnvironmentVariable extends Enumeration {
type EnvironmentVariable = Value
final val FHIR_REPO_URL = Value("FHIR_REPO_URL");
final val DATA_FOLDER_PATH= Value("DATA_FOLDER_PATH");
}

/**
* Check for duplicate names in the mappingTask array from the mappingJob definition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.tofhir.server.common.model.ToFhirRestCall
import io.tofhir.server.repository.job.JobFolderRepository
import io.tofhir.server.repository.mapping.ProjectMappingFolderRepository
import io.tofhir.server.repository.mappingContext.MappingContextFolderRepository
import io.tofhir.server.repository.project.ProjectFolderRepository
import io.tofhir.server.repository.project.{IProjectRepository, ProjectFolderRepository}
import io.tofhir.server.repository.schema.SchemaFolderRepository
import io.tofhir.server.repository.terminology.{ITerminologySystemRepository, TerminologySystemFolderRepository}
import io.tofhir.server.repository.terminology.codesystem.{CodeSystemRepository, ICodeSystemRepository}
Expand All @@ -30,27 +30,27 @@ import java.util.UUID
*/
class ToFhirServerEndpoint(toFhirEngineConfig: ToFhirEngineConfig, webServerConfig: WebServerConfig, fhirDefinitionsConfig: FhirDefinitionsConfig, redCapServiceConfig: Option[RedCapServiceConfig]) extends ICORSHandler with IErrorHandler {

val projectRepository: ProjectFolderRepository = new ProjectFolderRepository(toFhirEngineConfig) // creating the repository instance globally as weed a singleton instance
val projectRepository: IProjectRepository = new ProjectFolderRepository(toFhirEngineConfig) // creating the repository instance globally as weed a singleton instance
dogukan10 marked this conversation as resolved.
Show resolved Hide resolved
val mappingRepository: ProjectMappingFolderRepository = new ProjectMappingFolderRepository(toFhirEngineConfig.mappingRepositoryFolderPath, projectRepository)
dogukan10 marked this conversation as resolved.
Show resolved Hide resolved
val schemaRepository: SchemaFolderRepository = new SchemaFolderRepository(toFhirEngineConfig.schemaRepositoryFolderPath, projectRepository)
val mappingJobRepository: JobFolderRepository = new JobFolderRepository(toFhirEngineConfig.jobRepositoryFolderPath, projectRepository)
val mappingContextRepository: MappingContextFolderRepository = new MappingContextFolderRepository(toFhirEngineConfig.mappingContextRepositoryFolderPath, projectRepository)
val terminologySystemFolderRepository: ITerminologySystemRepository = new TerminologySystemFolderRepository(toFhirEngineConfig.terminologySystemFolderPath)
val terminologySystemRepository: ITerminologySystemRepository = new TerminologySystemFolderRepository(toFhirEngineConfig.terminologySystemFolderPath)
val conceptMapRepository: IConceptMapRepository = new ConceptMapRepository(toFhirEngineConfig.terminologySystemFolderPath)
val codeSystemRepository: ICodeSystemRepository = new CodeSystemRepository(toFhirEngineConfig.terminologySystemFolderPath)

// Initialize the projects by reading the resources available in the file system
val folderDBInitializer = new FolderDBInitializer(schemaRepository, mappingRepository, mappingJobRepository, projectRepository, mappingContextRepository)
val folderDBInitializer = new FolderDBInitializer(schemaRepository, mappingRepository, mappingJobRepository, projectRepository.asInstanceOf[ProjectFolderRepository], mappingContextRepository)
folderDBInitializer.init()

val projectEndpoint = new ProjectEndpoint(schemaRepository, mappingRepository, mappingJobRepository, mappingContextRepository, projectRepository)
val fhirDefinitionsEndpoint = new FhirDefinitionsEndpoint(fhirDefinitionsConfig)
val fhirPathFunctionsEndpoint = new FhirPathFunctionsEndpoint(Seq("io.onfhir.path", "io.tofhir.engine.mapping"))
val redcapEndpoint = redCapServiceConfig.map(config => new RedCapEndpoint(config))
val fileSystemTreeStructureEndpoint = new FileSystemTreeStructureEndpoint()
val terminologyServiceManagerEndpoint = new TerminologyServiceManagerEndpoint(terminologySystemFolderRepository, conceptMapRepository, codeSystemRepository, mappingJobRepository)
val terminologyServiceManagerEndpoint = new TerminologyServiceManagerEndpoint(terminologySystemRepository, conceptMapRepository, codeSystemRepository, mappingJobRepository)
val metadataEndpoint = new MetadataEndpoint(toFhirEngineConfig, webServerConfig, fhirDefinitionsConfig, redCapServiceConfig)
val reloadEndpoint= new ReloadEndpoint(mappingRepository, schemaRepository, mappingJobRepository, mappingContextRepository, terminologySystemFolderRepository.asInstanceOf[TerminologySystemFolderRepository], folderDBInitializer)
val reloadEndpoint= new ReloadEndpoint(mappingRepository, schemaRepository, mappingJobRepository, mappingContextRepository, terminologySystemRepository.asInstanceOf[TerminologySystemFolderRepository], folderDBInitializer)
// Custom rejection handler to send proper messages to user
val toFhirRejectionHandler: RejectionHandler = ToFhirRejectionHandler.getRejectionHandler()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
package io.tofhir.server.repository.job

import io.tofhir.common.model.ICachedRepository
import io.tofhir.engine.model.FhirMappingJob
import io.tofhir.server.repository.project.IProjectList

import scala.collection.mutable
import scala.concurrent.Future

trait IJobRepository {
trait IJobRepository extends ICachedRepository with IProjectList[FhirMappingJob] {

/**
* Returns a map of mapping jobs managed by this repository
* Retrieve all jobs of a project
*
* @param projectId project id the jobs belong to
* @return
*/
def getCachedMappingsJobs: mutable.Map[String, mutable.Map[String, FhirMappingJob]]
def getAllJobs(projectId: String): Future[Seq[FhirMappingJob]]

/**
* Retrieve all jobs
* Retrieves the jobs referencing the given mapping in their definitions.
*
* @param projectId project id the jobs belong to
* @return
* @param projectId identifier of project whose jobs will be checked
* @param mappingUrl the url of mapping
* @return the jobs referencing the given mapping in their definitions
*/
def getAllJobs(projectId: String): Future[Seq[FhirMappingJob]]
def getJobsReferencingMapping(projectId: String, mappingUrl: String): Future[Seq[FhirMappingJob]]

/**
* Save the job to the repository.
Expand Down Expand Up @@ -64,14 +67,6 @@ trait IJobRepository {
*
* @param projectId The unique identifier of the project for which jobs should be deleted.
*/
def deleteProjectJobs(projectId: String): Unit
def deleteAllJobs(projectId: String): Future[Unit]

/**
* Retrieves the jobs referencing the given mapping in their definitions.
*
* @param projectId identifier of project whose jobs will be checked
* @param mappingUrl the url of mapping
* @return the jobs referencing the given mapping in their definitions
*/
def getJobsReferencingMapping(projectId: String, mappingUrl: String): Future[Seq[FhirMappingJob]]
}
Loading