diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/ToFhirEngine.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/ToFhirEngine.scala index 98c93bd13..3cf2b35b4 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/ToFhirEngine.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/ToFhirEngine.scala @@ -4,16 +4,15 @@ import io.onfhir.path._ import io.tofhir.engine.config.{ToFhirConfig, ToFhirEngineConfig} import io.tofhir.engine.execution.RunningJobRegistry import io.tofhir.engine.execution.processing.FileStreamInputArchiver -import io.tofhir.engine.mapping._ import io.tofhir.engine.mapping.context.{IMappingContextLoader, MappingContextLoader} import io.tofhir.engine.mapping.schema.{IFhirSchemaLoader, SchemaFolderLoader} import io.tofhir.engine.model.exception.EngineInitializationException -import io.tofhir.engine.repository.mapping.{FhirMappingFolderRepository, IFhirMappingCachedRepository} +import io.tofhir.engine.repository.mapping.{FhirMappingFolderRepository, IFhirMappingRepository} import io.tofhir.engine.util.FileUtils import org.apache.spark.sql.SparkSession /** - *
tofhir Engine for executing mapping jobs and tasks.
+ *toFHIR Engine for executing mapping jobs and tasks.
*During initialization, the engine prioritizes the mapping and schema repositories provided as a constructor parameter. * If they are not provided as constructor parameters, they are initialized as folder repository based on the folder paths set in the engine configurations. *
@@ -22,7 +21,7 @@ import org.apache.spark.sql.SparkSession * @param schemaRepository Already instantiated schema repository that maintains a dynamically-updated data structure based on the operations on the schemas * @param functionLibraryFactories External function libraries containing function to be used within FHIRPath expressions */ -class ToFhirEngine(mappingRepository: Option[IFhirMappingCachedRepository] = None, schemaRepository: Option[IFhirSchemaLoader] = None, functionLibraryFactories: Map[String, IFhirPathFunctionLibraryFactory] = Map.empty) { +class ToFhirEngine(mappingRepository: Option[IFhirMappingRepository] = None, schemaRepository: Option[IFhirSchemaLoader] = None, functionLibraryFactories: Map[String, IFhirPathFunctionLibraryFactory] = Map.empty) { // Validate that both mapping and schema repositories are empty or non-empty if (mappingRepository.nonEmpty && schemaRepository.isEmpty || mappingRepository.isEmpty && schemaRepository.nonEmpty) { throw EngineInitializationException("Mapping and schema repositories should both empty or non-empty") @@ -33,7 +32,7 @@ class ToFhirEngine(mappingRepository: Option[IFhirMappingCachedRepository] = Non val sparkSession: SparkSession = ToFhirConfig.sparkSession //Repository for mapping definitions - val mappingRepo: IFhirMappingCachedRepository = mappingRepository.getOrElse(new FhirMappingFolderRepository(FileUtils.getPath(engineConfig.mappingRepositoryFolderPath).toUri)) + val mappingRepo: IFhirMappingRepository = mappingRepository.getOrElse(new FhirMappingFolderRepository(FileUtils.getPath(engineConfig.mappingRepositoryFolderPath).toUri)) //Context loader val contextLoader: IMappingContextLoader = new MappingContextLoader diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/CommandLineInterface.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/CommandLineInterface.scala index 4bed0e405..f58ed2065 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/CommandLineInterface.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/CommandLineInterface.scala @@ -1,21 +1,18 @@ package io.tofhir.engine.cli +import io.tofhir.engine.Execution.actorSystem.dispatcher +import io.tofhir.engine.ToFhirEngine import io.tofhir.engine.cli.command.{CommandExecutionContext, CommandFactory, Load} import io.tofhir.engine.mapping.job.{FhirMappingJobManager, MappingJobScheduler} import io.tofhir.engine.model.FhirMappingJobExecution import io.tofhir.engine.util.FhirMappingJobFormatter -import io.tofhir.engine.{ToFhirEngine, cli} -import it.sauronsoftware.cron4j.Scheduler import org.json4s.MappingException import java.io.FileNotFoundException -import java.net.URI -import java.nio.file.Paths import java.util.Scanner import scala.annotation.tailrec -import scala.concurrent.{Await, Future} -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} import scala.util.Try object CommandLineInterface { diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/command/Run.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/command/Run.scala index e342ea38b..42028bf90 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/cli/command/Run.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/cli/command/Run.scala @@ -3,8 +3,7 @@ package io.tofhir.engine.cli.command import io.tofhir.engine.mapping.job.FhirMappingJobManager import io.tofhir.engine.model.{FhirMappingJob, FhirMappingJobExecution} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.Try +import io.tofhir.engine.Execution.actorSystem.dispatcher class Run extends Command { override def execute(args: Seq[String], context: CommandExecutionContext): CommandExecutionContext = { diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala new file mode 100644 index 000000000..70b7c684e --- /dev/null +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariable.scala @@ -0,0 +1,7 @@ +package io.tofhir.engine.env + +object EnvironmentVariable extends Enumeration { + type EnvironmentVariable = Value + final val FHIR_REPO_URL = Value("FHIR_REPO_URL") + final val DATA_FOLDER_PATH= Value("DATA_FOLDER_PATH") +} diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala new file mode 100644 index 000000000..e03bf2080 --- /dev/null +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/env/EnvironmentVariableResolver.scala @@ -0,0 +1,110 @@ +package io.tofhir.engine.env + +import io.tofhir.engine.model.{FhirMappingJob, FhirRepositorySinkSettings, FhirSinkSettings, FileSystemSourceSettings, MappingJobSourceSettings} + +/** + * Provide functions to find and resolve the environment variables used within FhirMappingJob definitions. + * An ENV_VAR is given as ${ENV_VAR} within the MappingJob definition. + */ +object EnvironmentVariableResolver { + + /** + * Reads the environment variables defined in the EnvironmentVariable object. + * + * @return A Map[String, String] containing the environment variable names and their values. + */ + def getEnvironmentVariables: Map[String, String] = { + EnvironmentVariable.values + .map(_.toString) // Convert Enumeration values to String (names of the variables) + .flatMap { envVar => + sys.env.get(envVar).map(envVar -> _) // Get the environment variable value if it exists + } + .toMap + } + + /** + * Resolves the environment variables from a given String which is a file content of a FhirMappingJob definition. + * + * @param fileContent The file content potentially containing placeholders for environment variables. + * @return The file content with all recognized environment variables resolved. + */ + def resolveFileContent(fileContent: String): String = { + EnvironmentVariable.values.foldLeft(fileContent) { (updatedContent, envVar) => + val placeholder = "\\$\\{" + envVar.toString + "\\}" + sys.env.get(envVar.toString) match { + case Some(envValue) => + updatedContent.replaceAll(placeholder, envValue) + case None => + updatedContent // Leave the content unchanged if the environment variable is not set + } + } + } + + /** + * Resolves all environment variable placeholders in a FhirMappingJob instance. + * + * @param job FhirMappingJob instance. + * @return A new FhirMappingJob with resolved values. + */ + def resolveFhirMappingJob(job: FhirMappingJob): FhirMappingJob = { + job.copy( + sourceSettings = resolveSourceSettings(job.sourceSettings), + sinkSettings = resolveSinkSettings(job.sinkSettings) + ) + } + + /** + * Resolves environment variables in MappingJobSourceSettings. + * + * @param sourceSettings A map of source settings. + * @return The map with resolved settings. + */ + private def resolveSourceSettings(sourceSettings: Map[String, MappingJobSourceSettings]): Map[String, MappingJobSourceSettings] = { + sourceSettings.map { case (key, value) => + key -> (value match { + case fs: FileSystemSourceSettings => + fs.copy(dataFolderPath = resolveEnvironmentVariables(fs.dataFolderPath)) + case other => other // No changes for other types + }) + } + } + + + /** + * Resolves environment variables in FhirRepositorySinkSettings. + * + * @param sinkSettings FhirSinkSettings instance. + * @return FhirSinkSettings with resolved fhirRepoUrl, if applicable. + */ + private def resolveSinkSettings(sinkSettings: FhirSinkSettings): FhirSinkSettings = { + sinkSettings match { + case fr: FhirRepositorySinkSettings => + fr.copy(fhirRepoUrl = resolveEnvironmentVariables(fr.fhirRepoUrl)) + case other => other // No changes for other types + } + } + + /** + * Resolves placeholders in the format ${ENV_VAR} with their corresponding values + * from the system environment variables, but only if the variables are part + * of the EnvironmentVariable enumeration. + * + * @param value The string potentially containing placeholders. + * @return The resolved string with placeholders replaced. + */ + private def resolveEnvironmentVariables(value: String): String = { + val pattern = """\$\{([A-Z_]+)\}""".r + + pattern.replaceAllIn(value, m => { + val envVar = m.group(1) + if (EnvironmentVariable.values.exists(_.toString == envVar)) { + sys.env.getOrElse(envVar, { + throw new RuntimeException(s"Environment variable $envVar is not set.") + }) + } else { + throw new RuntimeException(s"Environment variable $envVar is not recognized in EnvironmentVariable enumeration.") + } + }) + } + +} diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingService.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingService.scala index 057309d88..0f4c2564f 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingService.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingService.scala @@ -11,7 +11,7 @@ import io.tofhir.engine.model._ import io.tofhir.engine.model.exception.FhirMappingException import org.json4s.JsonAST.{JArray, JNull, JObject, JValue} -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future /** diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/context/MappingContextLoader.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/context/MappingContextLoader.scala index 6fc4a138e..07e5b90f7 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/context/MappingContextLoader.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/context/MappingContextLoader.scala @@ -6,7 +6,7 @@ import io.tofhir.engine.util.{CsvUtil, FileUtils} import java.io.File import java.nio.file.Paths -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future /** diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala index 91cffd9c9..e3e718ba1 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala @@ -334,7 +334,7 @@ class FhirMappingJobManager( case (fj, (df, i)) => fj.flatMap(_ => executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, Some(mappingTask.name), dataset, fhirWriter)) - .map(_ => logger.debug(s"Chunk ${i + 1} is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}...")) + .map(_ => logger.debug(s"Chunk ${i + 1} / $numOfChunks is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}...")) ) } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/service/LocalTerminologyService.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/service/LocalTerminologyService.scala index d132518d5..6ee0e5e15 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/service/LocalTerminologyService.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/service/LocalTerminologyService.scala @@ -12,7 +12,7 @@ import org.json4s.{JArray, JBool, JObject, JString} import java.io.File import java.util.concurrent.TimeUnit -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future import scala.concurrent.duration.Duration diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/ICachedRepository.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/ICachedRepository.scala new file mode 100644 index 000000000..7d5f2a3bf --- /dev/null +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/ICachedRepository.scala @@ -0,0 +1,11 @@ +package io.tofhir.engine.repository + +/** + * A cached repository. + */ +trait ICachedRepository { + /** + * Invalidate the internal cache and refresh the cache content directly from their source + */ + def invalidate(): Unit +} diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/FhirMappingFolderRepository.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/FhirMappingFolderRepository.scala index d86719ad9..e96adc9f0 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/FhirMappingFolderRepository.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/FhirMappingFolderRepository.scala @@ -20,7 +20,7 @@ import scala.io.Source * * @param folderUri Path to the folder */ -class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingCachedRepository { +class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingRepository { private val logger: Logger = Logger(this.getClass) private var fhirMappings: Map[String, FhirMapping] = loadMappings(folderUri) @@ -39,20 +39,20 @@ class FhirMappingFolderRepository(folderUri: URI) extends IFhirMappingCachedRepo case e: Throwable => throw FhirMappingException(s"Given folder for the mapping repository is not valid.", e) } files.map { f => - val source = Source.fromFile(f, StandardCharsets.UTF_8.name()) // read the JSON file - val fileContent = try source.mkString finally source.close() - val fhirMapping = try { - JsonMethods.parse(fileContent).removeField { // Remove any fields starting with @ from the JSON. - case JField(fieldName, _) if fieldName.startsWith("@") => true - case _ => false - }.extractOpt[FhirMapping] - } catch { - case e: Exception => - logger.error(s"Cannot parse the mapping file ${f.getAbsolutePath}.") - Option.empty[FhirMapping] - } - fhirMapping -> f - }.filter(_._1.nonEmpty) // Remove the elements from the list if they are not valid FhirMapping JSONs + val source = Source.fromFile(f, StandardCharsets.UTF_8.name()) // read the JSON file + val fileContent = try source.mkString finally source.close() + val fhirMapping = try { + JsonMethods.parse(fileContent).removeField { // Remove any fields starting with @ from the JSON. + case JField(fieldName, _) if fieldName.startsWith("@") => true + case _ => false + }.extractOpt[FhirMapping] + } catch { + case e: Exception => + logger.error(s"Cannot parse the mapping file ${f.getAbsolutePath}.") + Option.empty[FhirMapping] + } + fhirMapping -> f + }.filter(_._1.nonEmpty) // Remove the elements from the list if they are not valid FhirMapping JSONs .map { case (fm, file) => fm.get -> file } // Get rid of the Option } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingCachedRepository.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingCachedRepository.scala deleted file mode 100644 index 6afe65e3b..000000000 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingCachedRepository.scala +++ /dev/null @@ -1,11 +0,0 @@ -package io.tofhir.engine.repository.mapping - -/** - * A cached repository for the Mappings - */ -trait IFhirMappingCachedRepository extends IFhirMappingRepository { - /** - * Invalidate the internal cache and refresh the cache with the FhirMappings directly from their source - */ - def invalidate(): Unit -} diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingRepository.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingRepository.scala index da1586f2c..1450ba66e 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingRepository.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/repository/mapping/IFhirMappingRepository.scala @@ -1,8 +1,9 @@ package io.tofhir.engine.repository.mapping import io.tofhir.engine.model.FhirMapping +import io.tofhir.engine.repository.ICachedRepository -trait IFhirMappingRepository { +trait IFhirMappingRepository extends ICachedRepository { /** * Return the Fhir mapping definition by given url * diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/util/CsvUtil.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/util/CsvUtil.scala index 84188dfa9..360e1b279 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/util/CsvUtil.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/util/CsvUtil.scala @@ -7,7 +7,7 @@ import com.fasterxml.jackson.dataformat.csv.{CsvMapper, CsvSchema} import io.tofhir.engine.Execution.actorSystem import java.io.{File, FileInputStream, InputStreamReader} -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.jdk.javaapi.CollectionConverters diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/util/FhirMappingJobFormatter.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/util/FhirMappingJobFormatter.scala index a23fbe8b1..a270eb6a2 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/util/FhirMappingJobFormatter.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/util/FhirMappingJobFormatter.scala @@ -1,9 +1,10 @@ package io.tofhir.engine.util import io.onfhir.client.model.{BasicAuthenticationSettings, BearerTokenAuthorizationSettings, FixedTokenAuthenticationSettings} -import io.tofhir.engine.model.{FhirMappingJob, FhirMappingTask, FhirRepositorySinkSettings, FhirServerSource, FhirServerSourceSettings, FileSystemSinkSettings, FileSystemSource, FileSystemSourceSettings, KafkaSource, KafkaSourceSettings, LocalFhirTerminologyServiceSettings, SQLSchedulingSettings, SchedulingSettings, SqlSource, SqlSourceSettings} -import org.json4s.{Formats, MappingException, ShortTypeHints} +import io.tofhir.engine.env.EnvironmentVariableResolver +import io.tofhir.engine.model._ import org.json4s.jackson.Serialization +import org.json4s.{Formats, MappingException, ShortTypeHints} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} @@ -59,7 +60,7 @@ object FhirMappingJobFormatter { */ def readMappingJobFromFile(filePath: String): FhirMappingJob = { val source = Source.fromFile(filePath, StandardCharsets.UTF_8.name()) - val fileContent = try replaceEnvironmentVariables(source.mkString) finally source.close() + val fileContent = try EnvironmentVariableResolver.resolveFileContent(source.mkString) finally source.close() val mappingJob = org.json4s.jackson.JsonMethods.parse(fileContent).extract[FhirMappingJob] // check there are no duplicate name on mappingTasks of the job val duplicateMappingTasks = FhirMappingJobFormatter.findDuplicateMappingTaskNames(mappingJob.mappings) @@ -69,22 +70,6 @@ object FhirMappingJobFormatter { mappingJob } - private def replaceEnvironmentVariables(fileContent: String): String = { - var returningContent = fileContent; - // val regex = """\$\{(.*?)\}""".r - EnvironmentVariable.values.foreach { e => - val regex = "\\$\\{" + e.toString + "\\}" - if (sys.env.contains(e.toString)) returningContent = returningContent.replaceAll(regex, sys.env(e.toString)) - } - returningContent - } - - object EnvironmentVariable extends Enumeration { - type EnvironmentVariable = Value - final val FHIR_REPO_URL = Value("FHIR_REPO_URL"); - final val DATA_FOLDER_PATH= Value("DATA_FOLDER_PATH"); - } - /** * Check for duplicate names in the mappingTask array from the mappingJob definition. * diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala index d85ccdb5d..33ec40d0b 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala @@ -170,7 +170,7 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF secondRow.getAs[String]("encounterEnd") shouldEqual "2024-08-02T09:00:00Z" secondRow.getAs[String]("observationLoincCode") shouldEqual "85354-9" secondRow.getAs[String]("observationDate") shouldEqual "2024-08-01T10:15:00Z" - secondRow.getAs[String]("observationResult") shouldEqual "140.0 mmHg" + secondRow.getAs[String]("observationResult") shouldEqual "140 mmHg" secondRow.getAs[String]("conditionSnomedCode") shouldEqual "38341003" secondRow.getAs[String]("conditionDate") shouldEqual "2024-08-01" } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ReloadEndpoint.scala b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ReloadEndpoint.scala index 944ee29f2..4ceff0fe2 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ReloadEndpoint.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ReloadEndpoint.scala @@ -7,32 +7,20 @@ import com.typesafe.scalalogging.LazyLogging import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.server.common.model.ToFhirRestCall import io.tofhir.server.endpoint.ReloadEndpoint.SEGMENT_RELOAD +import io.tofhir.server.repository.{FolderDBInitializer, IRepositoryManager} import io.tofhir.server.repository.job.JobFolderRepository import io.tofhir.server.repository.mapping.ProjectMappingFolderRepository import io.tofhir.server.repository.mappingContext.MappingContextFolderRepository import io.tofhir.server.repository.schema.SchemaFolderRepository import io.tofhir.server.repository.terminology.TerminologySystemFolderRepository import io.tofhir.server.service.ReloadService -import io.tofhir.server.service.db.FolderDBInitializer /** * Endpoint to reload resources from the file system. * */ -class ReloadEndpoint(mappingRepository: ProjectMappingFolderRepository, - schemaRepository: SchemaFolderRepository, - mappingJobRepository: JobFolderRepository, - mappingContextRepository: MappingContextFolderRepository, - terminologySystemFolderRepository: TerminologySystemFolderRepository, - folderDBInitializer: FolderDBInitializer) extends LazyLogging { +class ReloadEndpoint(repositoryManager: IRepositoryManager) extends LazyLogging { - val reloadService: ReloadService = new ReloadService( - mappingRepository, - schemaRepository, - mappingJobRepository, - mappingContextRepository, - terminologySystemFolderRepository, - folderDBInitializer - ) + val reloadService: ReloadService = new ReloadService(repositoryManager) def route(request: ToFhirRestCall): Route = { pathPrefix(SEGMENT_RELOAD) { diff --git a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/TerminologyServiceManagerEndpoint.scala b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/TerminologyServiceManagerEndpoint.scala index e4faed9d6..96a1ec1c9 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/TerminologyServiceManagerEndpoint.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/TerminologyServiceManagerEndpoint.scala @@ -4,21 +4,23 @@ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import com.typesafe.scalalogging.LazyLogging +import io.onfhir.definitions.common.model.Json4sSupport._ import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.server.common.model.{ResourceNotFound, ToFhirRestCall} import io.tofhir.server.endpoint.TerminologyServiceManagerEndpoint._ -import io.onfhir.definitions.common.model.Json4sSupport._ import io.tofhir.server.model.TerminologySystem -import io.tofhir.server.repository.job.JobFolderRepository +import io.tofhir.server.repository.job.IJobRepository import io.tofhir.server.repository.terminology.ITerminologySystemRepository -import io.tofhir.server.service.terminology.TerminologySystemService import io.tofhir.server.repository.terminology.codesystem.ICodeSystemRepository import io.tofhir.server.repository.terminology.conceptmap.IConceptMapRepository +import io.tofhir.server.service.terminology.TerminologySystemService import scala.concurrent.Future -class TerminologyServiceManagerEndpoint(terminologySystemRepository: ITerminologySystemRepository, conceptMapRepository: IConceptMapRepository, - codeSystemRepository: ICodeSystemRepository, mappingJobRepository: JobFolderRepository) extends LazyLogging { +class TerminologyServiceManagerEndpoint(terminologySystemRepository: ITerminologySystemRepository, + conceptMapRepository: IConceptMapRepository, + codeSystemRepository: ICodeSystemRepository, + mappingJobRepository: IJobRepository) extends LazyLogging { private val terminologySystemService: TerminologySystemService = new TerminologySystemService(terminologySystemRepository, mappingJobRepository) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ToFhirServerEndpoint.scala b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ToFhirServerEndpoint.scala index 3a3d6ab9f..bcf0bd007 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ToFhirServerEndpoint.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/endpoint/ToFhirServerEndpoint.scala @@ -13,14 +13,14 @@ import io.tofhir.server.common.model.ToFhirRestCall import io.tofhir.server.repository.job.JobFolderRepository import io.tofhir.server.repository.mapping.ProjectMappingFolderRepository import io.tofhir.server.repository.mappingContext.MappingContextFolderRepository -import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.server.repository.project.{IProjectRepository, ProjectFolderRepository} import io.tofhir.server.repository.schema.SchemaFolderRepository import io.tofhir.server.repository.terminology.{ITerminologySystemRepository, TerminologySystemFolderRepository} -import io.tofhir.server.repository.terminology.codesystem.{CodeSystemRepository, ICodeSystemRepository} -import io.tofhir.server.repository.terminology.conceptmap.{ConceptMapRepository, IConceptMapRepository} -import io.tofhir.server.service.db.FolderDBInitializer +import io.tofhir.server.repository.terminology.codesystem.{CodeSystemFolderRepository, ICodeSystemRepository} +import io.tofhir.server.repository.terminology.conceptmap.{ConceptMapFolderRepository, IConceptMapRepository} import io.tofhir.server.util.ToFhirRejectionHandler import io.onfhir.definitions.fhirpath.endpoint.FhirPathFunctionsEndpoint +import io.tofhir.server.repository.{FolderDBInitializer, FolderRepositoryManager, IRepositoryManager} import java.util.UUID @@ -30,27 +30,33 @@ import java.util.UUID */ class ToFhirServerEndpoint(toFhirEngineConfig: ToFhirEngineConfig, webServerConfig: WebServerConfig, fhirDefinitionsConfig: FhirDefinitionsConfig, redCapServiceConfig: Option[RedCapServiceConfig]) extends ICORSHandler with IErrorHandler { - val projectRepository: ProjectFolderRepository = new ProjectFolderRepository(toFhirEngineConfig) // creating the repository instance globally as weed a singleton instance - val mappingRepository: ProjectMappingFolderRepository = new ProjectMappingFolderRepository(toFhirEngineConfig.mappingRepositoryFolderPath, projectRepository) - val schemaRepository: SchemaFolderRepository = new SchemaFolderRepository(toFhirEngineConfig.schemaRepositoryFolderPath, projectRepository) - val mappingJobRepository: JobFolderRepository = new JobFolderRepository(toFhirEngineConfig.jobRepositoryFolderPath, projectRepository) - val mappingContextRepository: MappingContextFolderRepository = new MappingContextFolderRepository(toFhirEngineConfig.mappingContextRepositoryFolderPath, projectRepository) - val terminologySystemFolderRepository: ITerminologySystemRepository = new TerminologySystemFolderRepository(toFhirEngineConfig.terminologySystemFolderPath) - val conceptMapRepository: IConceptMapRepository = new ConceptMapRepository(toFhirEngineConfig.terminologySystemFolderPath) - val codeSystemRepository: ICodeSystemRepository = new CodeSystemRepository(toFhirEngineConfig.terminologySystemFolderPath) + private val repositoryManager: IRepositoryManager = new FolderRepositoryManager(toFhirEngineConfig) + // Initialize repositories by reading the resources available in the file system + repositoryManager.init() - // Initialize the projects by reading the resources available in the file system - val folderDBInitializer = new FolderDBInitializer(schemaRepository, mappingRepository, mappingJobRepository, projectRepository, mappingContextRepository) - folderDBInitializer.init() + private val projectEndpoint = new ProjectEndpoint( + repositoryManager.schemaRepository, + repositoryManager.mappingRepository, + repositoryManager.mappingJobRepository, + repositoryManager.mappingContextRepository, + repositoryManager.projectRepository + ) + + val terminologyServiceManagerEndpoint = new TerminologyServiceManagerEndpoint( + repositoryManager.terminologySystemRepository, + repositoryManager.conceptMapRepository, + repositoryManager.codeSystemRepository, + repositoryManager.mappingJobRepository + ) - val projectEndpoint = new ProjectEndpoint(schemaRepository, mappingRepository, mappingJobRepository, mappingContextRepository, projectRepository) val fhirDefinitionsEndpoint = new FhirDefinitionsEndpoint(fhirDefinitionsConfig) val fhirPathFunctionsEndpoint = new FhirPathFunctionsEndpoint(Seq("io.onfhir.path", "io.tofhir.engine.mapping")) val redcapEndpoint = redCapServiceConfig.map(config => new RedCapEndpoint(config)) val fileSystemTreeStructureEndpoint = new FileSystemTreeStructureEndpoint() - val terminologyServiceManagerEndpoint = new TerminologyServiceManagerEndpoint(terminologySystemFolderRepository, conceptMapRepository, codeSystemRepository, mappingJobRepository) val metadataEndpoint = new MetadataEndpoint(toFhirEngineConfig, webServerConfig, fhirDefinitionsConfig, redCapServiceConfig) - val reloadEndpoint= new ReloadEndpoint(mappingRepository, schemaRepository, mappingJobRepository, mappingContextRepository, terminologySystemFolderRepository.asInstanceOf[TerminologySystemFolderRepository], folderDBInitializer) + + val reloadEndpoint= new ReloadEndpoint(repositoryManager) + // Custom rejection handler to send proper messages to user val toFhirRejectionHandler: RejectionHandler = ToFhirRejectionHandler.getRejectionHandler() diff --git a/tofhir-server/src/main/scala/io/tofhir/server/model/Metadata.scala b/tofhir-server/src/main/scala/io/tofhir/server/model/Metadata.scala index eaec0aa19..d75347d37 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/model/Metadata.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/model/Metadata.scala @@ -1,7 +1,7 @@ package io.tofhir.server.model /** - * Model that represents the metadata of the server. + * Model that represents the metadata of the toFHIR server. * * @param name The name of the server. * @param description A description of the server. @@ -21,7 +21,8 @@ case class Metadata(name: String, definitionsRootUrls: Option[Seq[String]], schemasFhirVersion: String, repositoryNames: RepositoryNames, - archiving: Archiving + archiving: Archiving, + environmentVariables: Map[String, String] ) /** diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/db/FolderDBInitializer.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/FolderDBInitializer.scala similarity index 66% rename from tofhir-server/src/main/scala/io/tofhir/server/service/db/FolderDBInitializer.scala rename to tofhir-server/src/main/scala/io/tofhir/server/repository/FolderDBInitializer.scala index 3e8174844..ec8b3dc8e 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/db/FolderDBInitializer.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/FolderDBInitializer.scala @@ -1,4 +1,4 @@ -package io.tofhir.server.service.db +package io.tofhir.server.repository import com.typesafe.scalalogging.Logger import io.onfhir.definitions.common.model.Json4sSupport.formats @@ -23,10 +23,10 @@ import scala.language.postfixOps /** * Folder/Directory based database initializer implementation. * */ -class FolderDBInitializer(schemaFolderRepository: SchemaFolderRepository, +class FolderDBInitializer(projectFolderRepository: ProjectFolderRepository, + schemaFolderRepository: SchemaFolderRepository, mappingFolderRepository: ProjectMappingFolderRepository, mappingJobFolderRepository: JobFolderRepository, - projectFolderRepository: ProjectFolderRepository, mappingContextRepository: MappingContextFolderRepository) { private val logger: Logger = Logger(this.getClass) @@ -40,12 +40,10 @@ class FolderDBInitializer(schemaFolderRepository: SchemaFolderRepository, val parsedProjects = if (file.exists()) { val projects: JArray = FileOperations.readFileIntoJson(file).asInstanceOf[JArray] - val projectMap: Map[String, Project] = projects.arr.map(p => { - val project: Project = initProjectFromMetadata(p.asInstanceOf[JObject]) - project.id -> project - }) - .toMap - collection.mutable.Map(projectMap.toSeq: _*) + projects.arr.map(p => { + val project: Project = initProjectFromMetadata(p.asInstanceOf[JObject]) + project.id -> project + }).toMap } else { logger.debug(s"There does not exist a metadata file (${ProjectFolderRepository.PROJECTS_JSON}) for projects. Creating it...") file.createNewFile() @@ -55,7 +53,6 @@ class FolderDBInitializer(schemaFolderRepository: SchemaFolderRepository, } // Inject the parsed projects to the repository projectFolderRepository.setProjects(parsedProjects) - projectFolderRepository.updateProjectsMetadata() // update the metadata file after initialization } /** @@ -127,55 +124,51 @@ class FolderDBInitializer(schemaFolderRepository: SchemaFolderRepository, * * @return */ - private def initProjectsWithResources(): mutable.Map[String, Project] = { + private def initProjectsWithResources(): Map[String, Project] = { // Map keeping the projects. It uses the project name as a key. val projects: mutable.Map[String, Project] = mutable.Map.empty // Parse schemas - val schemas: mutable.Map[String, mutable.Map[String, SchemaDefinition]] = schemaFolderRepository.getCachedSchemas() - schemas.foreach(projectIdAndSchemas => { - val projectId: String = projectIdAndSchemas._1 - val schemaUrl: String = projectIdAndSchemas._2.head._2.url - // If there is no project create a new one. Use id as name as well - val project: Project = projects.get(projectId) match { - case Some(existingProject) => existingProject.copy(schemaUrlPrefix = Some(dropLastPart(schemaUrl))) - case None => Project(id = projectId, name = projectId, schemaUrlPrefix = Some(dropLastPart(schemaUrl))) - } - projects.put(projectId, project.copy(schemas = projectIdAndSchemas._2.values.toSeq)) - }) + schemaFolderRepository.getProjectPairs.foreach { + case (projectId, schemaDefinitionList) => + val schemaUrl = schemaDefinitionList.head.url + // If there is no project create a new one. Use id as name as well + val project: Project = projects.get(projectId) match { + case Some(existingProject) => existingProject.copy(schemaUrlPrefix = Some(dropLastPart(schemaUrl))) + case None => Project(id = projectId, name = projectId, schemaUrlPrefix = Some(dropLastPart(schemaUrl))) + } + projects.put(projectId, project.copy(schemas = schemaDefinitionList)) + } // Parse mappings - val mappings: mutable.Map[String, mutable.Map[String, FhirMapping]] = mappingFolderRepository.getCachedMappings() - mappings.foreach(projectIdAndMappings => { - val projectId: String = projectIdAndMappings._1 - val mappingUrl: String = projectIdAndMappings._2.head._2.url - // If there is no project create a new one. Use id as name as well - val project: Project = projects.get(projectId) match { - case Some(existingProject) => existingProject.copy(mappingUrlPrefix = Some(dropLastPart(mappingUrl))) - case None => Project(id = projectId, name = projectId, mappingUrlPrefix = Some(dropLastPart(mappingUrl))) - } - projects.put(projectId, project.copy(mappings = projectIdAndMappings._2.values.toSeq)) - }) + mappingFolderRepository.getProjectPairs.foreach { + case (projectId, mappingList) => + val mappingUrl: String = mappingList.head.url + // If there is no project create a new one. Use id as name as well + val project: Project = projects.get(projectId) match { + case Some(existingProject) => existingProject.copy(mappingUrlPrefix = Some(dropLastPart(mappingUrl))) + case None => Project(id = projectId, name = projectId, mappingUrlPrefix = Some(dropLastPart(mappingUrl))) + } + projects.put(projectId, project.copy(mappings = mappingList)) + } // Parse mapping jobs - val jobs: mutable.Map[String, mutable.Map[String, FhirMappingJob]] = mappingJobFolderRepository.getCachedMappingsJobs - jobs.foreach(projectIdAndMappingsJobs => { - val projectId: String = projectIdAndMappingsJobs._1 - // If there is no project create a new one. Use id as name as well - val project: Project = projects.getOrElse(projectId, Project(id = projectId, name = projectId)) - projects.put(projectId, project.copy(mappingJobs = projectIdAndMappingsJobs._2.values.toSeq)) - }) + mappingJobFolderRepository.getProjectPairs.foreach { + case (projectId, jobList) => + // If there is no project create a new one. Use id as name as well + val project: Project = projects.getOrElse(projectId, Project(id = projectId, name = projectId)) + projects.put(projectId, project.copy(mappingJobs = jobList)) + } // Parse mapping contexts - val mappingContexts: mutable.Map[String, Seq[String]] = mappingContextRepository.getCachedMappingContexts() - mappingContexts.foreach(mappingContexts => { - val projectId: String = mappingContexts._1 - // If there is no project create a new one. Use id as name as well - val project: Project = projects.getOrElse(projectId, Project(id = projectId, name = projectId)) - projects.put(projectId, project.copy(mappingContexts = mappingContexts._2)) - }) - - projects + mappingContextRepository.getProjectPairs.foreach { + case (projectId, mappingContextIdList) => + // If there is no project create a new one. Use id as name as well + val project: Project = projects.getOrElse(projectId, Project(id = projectId, name = projectId)) + projects.put(projectId, project.copy(mappingContexts = mappingContextIdList)) + } + + projects.toMap } /** diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/FolderRepositoryManager.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/FolderRepositoryManager.scala new file mode 100644 index 000000000..fb517a5a7 --- /dev/null +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/FolderRepositoryManager.scala @@ -0,0 +1,53 @@ +package io.tofhir.server.repository + +import io.tofhir.engine.config.ToFhirEngineConfig +import io.tofhir.server.repository.job.JobFolderRepository +import io.tofhir.server.repository.mapping.ProjectMappingFolderRepository +import io.tofhir.server.repository.mappingContext.MappingContextFolderRepository +import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.server.repository.schema.SchemaFolderRepository +import io.tofhir.server.repository.terminology.TerminologySystemFolderRepository +import io.tofhir.server.repository.terminology.codesystem.CodeSystemFolderRepository +import io.tofhir.server.repository.terminology.conceptmap.ConceptMapFolderRepository + +/** + * Folder/file based implementation of the RepositoryManager where all managed repositories are folder-based. + * + * @param toFhirEngineConfig + */ +class FolderRepositoryManager(toFhirEngineConfig: ToFhirEngineConfig) extends IRepositoryManager { + + override val projectRepository: ProjectFolderRepository = new ProjectFolderRepository(toFhirEngineConfig) + override val mappingRepository: ProjectMappingFolderRepository = new ProjectMappingFolderRepository(toFhirEngineConfig.mappingRepositoryFolderPath, projectRepository) + override val schemaRepository: SchemaFolderRepository = new SchemaFolderRepository(toFhirEngineConfig.schemaRepositoryFolderPath, projectRepository) + override val mappingJobRepository: JobFolderRepository = new JobFolderRepository(toFhirEngineConfig.jobRepositoryFolderPath, projectRepository) + override val mappingContextRepository: MappingContextFolderRepository = new MappingContextFolderRepository(toFhirEngineConfig.mappingContextRepositoryFolderPath, projectRepository) + + override val terminologySystemRepository: TerminologySystemFolderRepository = new TerminologySystemFolderRepository(toFhirEngineConfig.terminologySystemFolderPath) + override val conceptMapRepository: ConceptMapFolderRepository = new ConceptMapFolderRepository(toFhirEngineConfig.terminologySystemFolderPath) + override val codeSystemRepository: CodeSystemFolderRepository = new CodeSystemFolderRepository(toFhirEngineConfig.terminologySystemFolderPath) + + private val folderDBInitializer = new FolderDBInitializer( + projectRepository, + schemaRepository, + mappingRepository, + mappingJobRepository, + mappingContextRepository + ) + + /** + * Initializes the Repository Manager's internal database (the projects.json file) after initialization of + * each individual repository. + */ + override def init(): Unit = { + folderDBInitializer.init() + } + + /** + * Deletes the internal repository database (the projects.json file) for a fresh start (usually after cache invalidate operations) + */ + override def clear(): Unit = { + folderDBInitializer.removeProjectsJsonFile() + } + +} diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/IRepositoryManager.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/IRepositoryManager.scala new file mode 100644 index 000000000..75300829f --- /dev/null +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/IRepositoryManager.scala @@ -0,0 +1,35 @@ +package io.tofhir.server.repository + +import io.tofhir.server.repository.job.IJobRepository +import io.tofhir.server.repository.mapping.IMappingRepository +import io.tofhir.server.repository.mappingContext.IMappingContextRepository +import io.tofhir.server.repository.project.IProjectRepository +import io.tofhir.server.repository.schema.ISchemaRepository +import io.tofhir.server.repository.terminology.ITerminologySystemRepository +import io.tofhir.server.repository.terminology.codesystem.ICodeSystemRepository +import io.tofhir.server.repository.terminology.conceptmap.IConceptMapRepository + +/** + * Manage the repositories throughout toFHIR + */ +trait IRepositoryManager { + val projectRepository: IProjectRepository + val mappingRepository: IMappingRepository + val schemaRepository: ISchemaRepository + val mappingJobRepository: IJobRepository + val mappingContextRepository: IMappingContextRepository + + val terminologySystemRepository: ITerminologySystemRepository + val conceptMapRepository: IConceptMapRepository + val codeSystemRepository: ICodeSystemRepository + + /** + * Initialize the repository + */ + def init(): Unit + + /** + * Clean-up the repository database + */ + def clear(): Unit +} diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/job/IJobRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/job/IJobRepository.scala index b4115af39..03cf2dca1 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/job/IJobRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/job/IJobRepository.scala @@ -1,26 +1,29 @@ package io.tofhir.server.repository.job import io.tofhir.engine.model.FhirMappingJob +import io.tofhir.engine.repository.ICachedRepository +import io.tofhir.server.repository.project.IProjectList -import scala.collection.mutable import scala.concurrent.Future -trait IJobRepository { +trait IJobRepository extends ICachedRepository with IProjectList[FhirMappingJob] { /** - * Returns a map of mapping jobs managed by this repository + * Retrieve all jobs of a project * + * @param projectId project id the jobs belong to * @return */ - def getCachedMappingsJobs: mutable.Map[String, mutable.Map[String, FhirMappingJob]] + def getAllJobs(projectId: String): Future[Seq[FhirMappingJob]] /** - * Retrieve all jobs + * Retrieves the jobs referencing the given mapping in their definitions. * - * @param projectId project id the jobs belong to - * @return + * @param projectId identifier of project whose jobs will be checked + * @param mappingUrl the url of mapping + * @return the jobs referencing the given mapping in their definitions */ - def getAllJobs(projectId: String): Future[Seq[FhirMappingJob]] + def getJobsReferencingMapping(projectId: String, mappingUrl: String): Future[Seq[FhirMappingJob]] /** * Save the job to the repository. @@ -64,14 +67,6 @@ trait IJobRepository { * * @param projectId The unique identifier of the project for which jobs should be deleted. */ - def deleteProjectJobs(projectId: String): Unit + def deleteAllJobs(projectId: String): Future[Unit] - /** - * Retrieves the jobs referencing the given mapping in their definitions. - * - * @param projectId identifier of project whose jobs will be checked - * @param mappingUrl the url of mapping - * @return the jobs referencing the given mapping in their definitions - */ - def getJobsReferencingMapping(projectId: String, mappingUrl: String): Future[Seq[FhirMappingJob]] } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/job/JobFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/job/JobFolderRepository.scala index 471cef605..ce71a551d 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/job/JobFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/job/JobFolderRepository.scala @@ -5,11 +5,10 @@ import io.onfhir.api.util.IOUtil import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.engine.model.FhirMappingJob import io.tofhir.engine.util.FhirMappingJobFormatter.formats -import io.tofhir.engine.util.{FhirMappingJobFormatter, FileUtils} import io.tofhir.engine.util.FileUtils.FileExtensions -import io.tofhir.server.common.model.{AlreadyExists, BadRequest, ResourceNotFound} -import io.tofhir.server.model._ -import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.engine.util.{FhirMappingJobFormatter, FileUtils} +import io.tofhir.server.common.model.{AlreadyExists, ResourceNotFound} +import io.tofhir.server.repository.project.IProjectRepository import io.tofhir.server.util.FileOperations import org.json4s.MappingException import org.json4s.jackson.JsonMethods @@ -21,37 +20,28 @@ import scala.collection.mutable import scala.concurrent.Future import scala.io.Source -class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderRepository: ProjectFolderRepository) extends IJobRepository { +class JobFolderRepository(jobRepositoryFolderPath: String, projectRepository: IProjectRepository) extends IJobRepository { private val logger: Logger = Logger(this.getClass) + + // In-memory cache to maintain the job definitions. // project id -> mapping job id -> mapping job private val jobDefinitions: mutable.Map[String, mutable.Map[String, FhirMappingJob]] = mutable.Map.empty[String, mutable.Map[String, FhirMappingJob]] + // Initialize the map for the first time initMap(jobRepositoryFolderPath) /** - * Returns the mappings managed by this repository - * - * @return - */ - override def getCachedMappingsJobs: mutable.Map[String, mutable.Map[String, FhirMappingJob]] = { - jobDefinitions - } - - - /** - * Retrieve all jobs + * Retrieve all jobs of a given project * * @param projectId project id the jobs belong to * @return */ override def getAllJobs(projectId: String): Future[Seq[FhirMappingJob]] = { Future { - if (jobDefinitions.contains(projectId)) { - jobDefinitions(projectId).values.toSeq - } else { - Seq.empty - } + jobDefinitions.get(projectId) + .map(_.values.toSeq) // If such a project exists, return the jobs as a sequence + .getOrElse(Seq.empty[FhirMappingJob]) // Else, return an empty list } } @@ -63,18 +53,20 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito * @return */ override def saveJob(projectId: String, job: FhirMappingJob): Future[FhirMappingJob] = { - if (jobDefinitions.contains(projectId) && jobDefinitions(projectId).contains(job.id)) { + jobDefinitions.get(projectId).flatMap(_.get(job.id)).foreach { _ => throw AlreadyExists("Fhir mapping job already exists.", s"A job definition with id ${job.id} already exists in the job repository at ${FileUtils.getPath(jobRepositoryFolderPath).toAbsolutePath.toString}") } // Write to the repository as a new file - getFileForJob(projectId, job.id).map(file => { + getFileForJob(projectId, job.id).flatMap(file => { val fw = new FileWriter(file) fw.write(writePretty(job)) fw.close() - // add the job to the project repo and the map - projectFolderRepository.addJob(projectId, job) - jobDefinitions.getOrElseUpdate(projectId, mutable.Map.empty).put(job.id, job) - job + // Update the internal cache of job repository + jobDefinitions.getOrElseUpdate(projectId, mutable.Map.empty[String, FhirMappingJob]).put(job.id, job) + // add the job to the project repository + projectRepository.addJob(projectId, job) map { _ => + job + } }) } @@ -87,7 +79,7 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito */ override def getJob(projectId: String, jobId: String): Future[Option[FhirMappingJob]] = { Future { - jobDefinitions(projectId).get(jobId) + jobDefinitions.get(projectId).flatMap(_.get(jobId)) } } @@ -100,19 +92,20 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito * @return */ override def updateJob(projectId: String, jobId: String, job: FhirMappingJob): Future[FhirMappingJob] = { - if (!jobDefinitions.contains(projectId) || !jobDefinitions(projectId).contains(jobId)) { + if (!jobDefinitions.get(projectId).exists(_.contains(jobId))) { throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository at ${FileUtils.getPath(jobRepositoryFolderPath).toAbsolutePath.toString}") } // update the job in the repository - getFileForJob(projectId, job.id).map(file => { + getFileForJob(projectId, job.id).flatMap(file => { val fw = new FileWriter(file) fw.write(writePretty(job)) fw.close() // update the mapping job in the map jobDefinitions(projectId).put(jobId, job) // update the job in the project - projectFolderRepository.updateJob(projectId, job) - job + projectRepository.updateJob(projectId, job) map { _ => + job + } }) } @@ -124,16 +117,16 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito * @return */ override def deleteJob(projectId: String, jobId: String): Future[Unit] = { - if (!jobDefinitions.contains(projectId) || !jobDefinitions(projectId).contains(jobId)) { + if (!jobDefinitions.get(projectId).exists(_.contains(jobId))) { throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository at ${FileUtils.getPath(jobRepositoryFolderPath).toAbsolutePath.toString}") } // delete the mapping job from the repository - getFileForJob(projectId, jobDefinitions(projectId)(jobId).id).map(file => { + getFileForJob(projectId, jobDefinitions(projectId)(jobId).id).flatMap(file => { file.delete() jobDefinitions(projectId).remove(jobId) // delete the job from the project - projectFolderRepository.deleteJob(projectId, Some(jobId)) + projectRepository.deleteJob(projectId, Some(jobId)) }) } @@ -142,13 +135,16 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito * * @param projectId The unique identifier of the project for which jobs should be deleted. */ - override def deleteProjectJobs(projectId: String): Unit = { - // delete job definitions for the project - org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(jobRepositoryFolderPath, projectId).toFile) - // remove project from the cache - jobDefinitions.remove(projectId) - // delete project jobs - projectFolderRepository.deleteJob(projectId) + override def deleteAllJobs(projectId: String): Future[Unit] = { + Future { + // delete job definitions for the project + org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(jobRepositoryFolderPath, projectId).toFile) + // remove project from the cache + jobDefinitions.remove(projectId) + } flatMap { _ => + // delete project jobs + projectRepository.deleteJob(projectId) + } } /** @@ -160,8 +156,8 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito */ override def getJobsReferencingMapping(projectId: String, mappingUrl: String): Future[Seq[FhirMappingJob]] = { Future { - val jobs: Seq[FhirMappingJob] = jobDefinitions.getOrElse(key = projectId, default = Map.empty).values.toSeq - jobs.filter(job => job.mappings.map(mappingTask => mappingTask.mappingRef).contains(mappingUrl)) + val jobs = jobDefinitions.get(projectId).toSeq.flatMap(_.values) + jobs.filter(_.mappings.exists(_.mappingRef == mappingUrl)) } } @@ -173,7 +169,7 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito * @return */ private def getFileForJob(projectId: String, jobId: String): Future[File] = { - projectFolderRepository.getProject(projectId) map { project => + projectRepository.getProject(projectId) map { project => if (project.isEmpty) throw new IllegalStateException(s"This should not be possible. ProjectId: $projectId does not exist in the project folder repository.") FileOperations.getFileForEntityWithinProject(jobRepositoryFolderPath, project.get.id, jobId) } @@ -224,10 +220,22 @@ class JobFolderRepository(jobRepositoryFolderPath: String, projectFolderReposito /** * Reload the job definitions from the given folder + * * @return */ - def reloadJobDefinitions(): Unit = { + override def invalidate(): Unit = { this.jobDefinitions.clear() initMap(jobRepositoryFolderPath) } + + /** + * Retrieve the projects and FhirMappingJobs within. + * + * @return Map of projectId -> Seq[FhirMappingJob] + */ + override def getProjectPairs: Map[String, Seq[FhirMappingJob]] = { + jobDefinitions.map { case (projectId, jobPairs) => + projectId -> jobPairs.values.toSeq + }.toMap + } } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/IMappingRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/IMappingRepository.scala index cd819e6e5..37bce2bcf 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/IMappingRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/IMappingRepository.scala @@ -1,14 +1,15 @@ package io.tofhir.server.repository.mapping import io.tofhir.engine.model.FhirMapping -import io.tofhir.engine.repository.mapping.IFhirMappingCachedRepository +import io.tofhir.engine.repository.mapping.IFhirMappingRepository +import io.tofhir.server.repository.project.IProjectList import scala.concurrent.Future /** * Interface to save and load mappings so that the client applications can manage the mappings through CRUD operations */ -trait IMappingRepository extends IFhirMappingCachedRepository { +trait IMappingRepository extends IFhirMappingRepository with IProjectList[FhirMapping] { /** * Retrieve all mappings for the given project @@ -60,7 +61,7 @@ trait IMappingRepository extends IFhirMappingCachedRepository { * * @param projectId The unique identifier of the project for which mappings should be deleted. */ - def deleteProjectMappings(projectId: String): Unit + def deleteAllMappings(projectId: String): Future[Unit] /** * Retrieves the identifiers of mappings referencing the given schema in their definitions. diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/ProjectMappingFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/ProjectMappingFolderRepository.scala index 07ce92f37..a568e7443 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/ProjectMappingFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/mapping/ProjectMappingFolderRepository.scala @@ -7,8 +7,8 @@ import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.engine.model.FhirMapping import io.tofhir.engine.util.FileUtils import io.tofhir.engine.util.FileUtils.FileExtensions -import io.tofhir.server.common.model.{AlreadyExists, BadRequest, ResourceNotFound} -import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.server.common.model.{AlreadyExists, ResourceNotFound} +import io.tofhir.server.repository.project.IProjectRepository import io.tofhir.server.util.FileOperations import org.json4s.jackson.JsonMethods import org.json4s.jackson.Serialization.writePretty @@ -24,9 +24,9 @@ import scala.io.Source * It also extends the engine's folder-based mapping repository implementation to be able to use the same business logic to load mappings from folders. * * @param mappingRepositoryFolderPath root folder path to the mapping repository - * @param projectFolderRepository project repository to update corresponding projects based on updates on the mappings + * @param projectRepository project repository to update corresponding projects based on updates on the mappings */ -class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projectFolderRepository: ProjectFolderRepository) extends IMappingRepository { +class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projectRepository: IProjectRepository) extends IMappingRepository { private val logger: Logger = Logger(this.getClass) @@ -36,26 +36,16 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec initMap(mappingRepositoryFolderPath) /** - * Returns the mappings managed by this repository - * - * @return - */ - def getCachedMappings(): mutable.Map[String, mutable.Map[String, FhirMapping]] = { - mappingDefinitions - } - - /** - * Retrieve the metadata of all MappingFile (only url, type and name fields are populated) + * Retrieve all mappings of a given project * + * @param projectId project id the jobs belong to * @return */ override def getAllMappings(projectId: String): Future[Seq[FhirMapping]] = { Future { - if (mappingDefinitions.contains(projectId)) { - mappingDefinitions(projectId).values.toSeq - } else { - Seq.empty - } + mappingDefinitions.get(projectId) + .map(_.values.toSeq) // If such a project exists, return the mappings as a sequence + .getOrElse(Seq.empty[FhirMapping]) // Else, return an empty list } } @@ -68,25 +58,28 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec */ override def saveMapping(projectId: String, mapping: FhirMapping): Future[FhirMapping] = { // validate that mapping id is unique - if (mappingDefinitions.contains(projectId) && mappingDefinitions(projectId).contains(mapping.id)) { + mappingDefinitions.get(projectId).flatMap(_.get(mapping.id)).foreach { _ => throw AlreadyExists("Fhir mapping already exists.", s"A mapping definition with id ${mapping.id} already exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") } // validate that mapping url is unique - if (mappingDefinitions.contains(projectId) && mappingDefinitions(projectId).exists(mD => mD._2.url.contentEquals(mapping.url))) { - throw AlreadyExists("Fhir mapping already exists.", s"A mapping definition with url ${mapping.url} already exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") + mappingDefinitions.get(projectId).foreach { mappings => + if (mappings.values.exists(_.url.contentEquals(mapping.url))) { + throw AlreadyExists("Fhir mapping already exists.", s"A mapping definition with url ${mapping.url} already exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") + } } // Write to the repository as a new file - getFileForMapping(projectId, mapping.id).map(newFile => { + getFileForMapping(projectId, mapping.id).flatMap(newFile => { val fw = new FileWriter(newFile) fw.write(writePretty(mapping)) fw.close() // Add to the project metadata json file - projectFolderRepository.addMapping(projectId, mapping) - // Add to the in-memory map - mappingDefinitions.getOrElseUpdate(projectId, mutable.Map.empty).put(mapping.id, mapping) - mapping + projectRepository.addMapping(projectId, mapping) map { _ => + // Add to the in-memory map + mappingDefinitions.getOrElseUpdate(projectId, mutable.Map.empty).put(mapping.id, mapping) + mapping + } }) } @@ -113,23 +106,27 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec * @throws AlreadyExists if a mapping with the same URL already exists */ override def updateMapping(projectId: String, mappingId: String, mapping: FhirMapping): Future[FhirMapping] = { - if (!mappingDefinitions.contains(projectId) || !mappingDefinitions(projectId).contains(mappingId)) { + if (!mappingDefinitions.get(projectId).exists(_.contains(mappingId))) { throw ResourceNotFound("Mapping does not exists.", s"A mapping with id $mappingId does not exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") } // validate that mapping url is unique - if (mappingDefinitions.contains(projectId) && mappingDefinitions(projectId).exists(mD => !mD._1.contentEquals(mappingId) && mD._2.url.contentEquals(mapping.url))) { - throw AlreadyExists("Fhir mapping already exists.", s"A mapping definition with url ${mapping.url} already exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") + mappingDefinitions.get(projectId).foreach { mappings => + if (mappings.exists { case (id, definition) => !id.contentEquals(mappingId) && definition.url.contentEquals(mapping.url) }) { + throw AlreadyExists("Fhir mapping already exists.", s"A mapping definition with url ${mapping.url} already exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") + } } + // update the mapping in the repository - getFileForMapping(projectId, mapping.id).map(file => { + getFileForMapping(projectId, mapping.id).flatMap(file => { val fw = new FileWriter(file) fw.write(writePretty(mapping)) fw.close() // update the mapping in the in-memory map mappingDefinitions(projectId).put(mappingId, mapping) // update the projects metadata json file - projectFolderRepository.updateMapping(projectId, mapping) - mapping + projectRepository.updateMapping(projectId, mapping) map { _ => + mapping + } }) } @@ -141,16 +138,16 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec * @return */ override def deleteMapping(projectId: String, mappingId: String): Future[Unit] = { - if (!mappingDefinitions.contains(projectId) || !mappingDefinitions(projectId).contains(mappingId)) { + if (!mappingDefinitions.get(projectId).exists(_.contains(mappingId))) { throw ResourceNotFound("Mapping does not exists.", s"A mapping with id $mappingId does not exists in the mapping repository at ${FileUtils.getPath(mappingRepositoryFolderPath).toAbsolutePath.toString}") } // delete the mapping from the repository - getFileForMapping(projectId, mappingDefinitions(projectId)(mappingId).id).map(file => { + getFileForMapping(projectId, mappingDefinitions(projectId)(mappingId).id).flatMap(file => { file.delete() mappingDefinitions(projectId).remove(mappingId) // delete the mapping from projects json file - projectFolderRepository.deleteMapping(projectId, Some(mappingId)) + projectRepository.deleteMapping(projectId, Some(mappingId)) }) } @@ -159,13 +156,16 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec * * @param projectId The unique identifier of the project for which mappings should be deleted. */ - override def deleteProjectMappings(projectId: String): Unit = { - // delete mapping definitions for the project - org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(mappingRepositoryFolderPath, projectId).toFile) - // remove project from the cache - mappingDefinitions.remove(projectId) - // delete project mappings - projectFolderRepository.deleteMapping(projectId) + override def deleteAllMappings(projectId: String): Future[Unit] = { + Future { + // delete mapping definitions for the project + org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(mappingRepositoryFolderPath, projectId).toFile) + // remove project from the cache + mappingDefinitions.remove(projectId) + } flatMap { _ => + // delete project mappings + projectRepository.deleteMapping(projectId) + } } /** @@ -191,7 +191,7 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec * @return */ private def getFileForMapping(projectId: String, fhirMappingId: String): Future[File] = { - projectFolderRepository.getProject(projectId) map { project => + projectRepository.getProject(projectId) map { project => if (project.isEmpty) throw new IllegalStateException(s"This should not be possible. ProjectId: $projectId does not exist in the project folder repository.") FileOperations.getFileForEntityWithinProject(mappingRepositoryFolderPath, project.get.id, fhirMappingId) } @@ -229,7 +229,7 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec System.exit(1) } } - if(fhirMappingMap.isEmpty) { + if (fhirMappingMap.isEmpty) { // No processable schema files under projectDirectory logger.warn(s"There are no processable mapping files under ${projectDirectory.getAbsolutePath}. Skipping ${projectDirectory.getName}.") } else { @@ -255,15 +255,19 @@ class ProjectMappingFolderRepository(mappingRepositoryFolderPath: String, projec * Invalidate the internal cache and refresh the cache with the FhirMappings directly from their source */ override def invalidate(): Unit = { - // nothing needs to be done as we keep the cache always up-to-date + this.mappingDefinitions.clear() + initMap(mappingRepositoryFolderPath) } /** - * Reload the mapping definitions from the given folder - * @return + * Retrieve the projects and FhirMappings within. + * + * @return Map of projectId -> Seq[FhirMapping] */ - def reloadMappingDefinitions(): Unit = { - this.mappingDefinitions.clear() - initMap(mappingRepositoryFolderPath) + override def getProjectPairs: Map[String, Seq[FhirMapping]] = { + mappingDefinitions.map { case (projectId, mappingPairs) => + projectId -> mappingPairs.values.toSeq + }.toMap } + } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/IMappingContextRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/IMappingContextRepository.scala index 33c4cb365..8ee822135 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/IMappingContextRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/IMappingContextRepository.scala @@ -2,14 +2,16 @@ package io.tofhir.server.repository.mappingContext import akka.stream.scaladsl.Source import akka.util.ByteString +import io.tofhir.engine.repository.ICachedRepository import io.tofhir.server.model.csv.CsvHeader +import io.tofhir.server.repository.project.IProjectList import scala.concurrent.Future /** * Interface for the mapping context repository */ -trait IMappingContextRepository { +trait IMappingContextRepository extends ICachedRepository with IProjectList[String] { /** * Retrieve the metadata of all mapping context ids @@ -40,7 +42,7 @@ trait IMappingContextRepository { * * @param projectId The unique identifier of the project for which mapping contexts should be deleted. */ - def deleteProjectMappingContexts(projectId: String): Unit + def deleteAllMappingContexts(projectId: String): Unit /** * Update the mapping context header by its id diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/MappingContextFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/MappingContextFolderRepository.scala index 3590bc13d..806e434e7 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/MappingContextFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/mappingContext/MappingContextFolderRepository.scala @@ -8,7 +8,7 @@ import io.tofhir.engine.util.FileUtils import io.tofhir.server.common.model.{AlreadyExists, ResourceNotFound} import io.tofhir.server.model._ import io.tofhir.server.model.csv.CsvHeader -import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.server.repository.project.IProjectRepository import io.tofhir.server.util.CsvUtil import java.io.File @@ -21,21 +21,12 @@ import scala.concurrent.Future * * @param mappingContextRepositoryFolderPath root folder path to the mapping context repository */ -class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, projectFolderRepository: ProjectFolderRepository) extends IMappingContextRepository { +class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, projectRepository: IProjectRepository) extends IMappingContextRepository { // project id -> mapping context id private val mappingContextDefinitions: mutable.Map[String, Seq[String]] = mutable.Map.empty[String, Seq[String]] // Initialize the map for the first time initMap(mappingContextRepositoryFolderPath) - /** - * Returns the mapping context cached in memory - * - * @return - */ - def getCachedMappingContexts(): mutable.Map[String, Seq[String]] = { - mappingContextDefinitions - } - /** * Retrieve the metadata of all mapping context ids * @@ -43,11 +34,7 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, */ override def getAllMappingContext(projectId: String): Future[Seq[String]] = { Future { - if (mappingContextDefinitions.contains(projectId)) { - mappingContextDefinitions(projectId) - } else { - Seq.empty - } + mappingContextDefinitions.getOrElse(projectId, Seq.empty[String]) } } @@ -55,22 +42,23 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, * Save the mapping context to the repository. * We only store id of a mapping context in the project metadata json file. * - * @param projectId project id the mapping context belongs to - * @param id mapping context id to save + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id to save * @return */ - override def createMappingContext(projectId: String, id: String): Future[String] = { - if (mappingContextExists(projectId, id)) { - throw AlreadyExists("Fhir mapping already exists.", s"A mapping context definition with id ${id} already exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def createMappingContext(projectId: String, mappingContextId: String): Future[String] = { + if (mappingContextExists(projectId, mappingContextId)) { + throw AlreadyExists("Fhir mapping already exists.", s"A mapping context definition with id $mappingContextId already exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Write to the repository as a new file - getFileForMappingContext(projectId, id).map(newFile => { + getFileForMappingContext(projectId, mappingContextId).flatMap(newFile => { newFile.createNewFile() // Add to the project metadata json file - projectFolderRepository.addMappingContext(projectId, id) - // Add to the in-memory map - mappingContextDefinitions(projectId) = mappingContextDefinitions.getOrElseUpdate(projectId, Seq.empty) :+ id - id + projectRepository.addMappingContext(projectId, mappingContextId) map { _ => + // Add to the in-memory map + mappingContextDefinitions(projectId) = mappingContextDefinitions.getOrElseUpdate(projectId, Seq.empty) :+ mappingContextId + mappingContextId + } }) } @@ -78,21 +66,21 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Delete the mapping context from the repository * - * @param projectId project id the mapping context belongs to - * @param id mapping context id + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id * @return */ - override def deleteMappingContext(projectId: String, id: String): Future[Unit] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def deleteMappingContext(projectId: String, mappingContextId: String): Future[Unit] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // delete the mapping context from the repository - getFileForMappingContext(projectId, id).map(file => { + getFileForMappingContext(projectId, mappingContextId).flatMap(file => { file.delete() - mappingContextDefinitions(projectId) = mappingContextDefinitions(projectId).filterNot(_ == id) + mappingContextDefinitions(projectId) = mappingContextDefinitions(projectId).filterNot(_ == mappingContextId) // update the projects metadata json file - projectFolderRepository.deleteMappingContext(projectId, Some(id)) + projectRepository.deleteMappingContext(projectId, Some(mappingContextId)) }) } @@ -101,28 +89,32 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, * * @param projectId The unique identifier of the project for which mapping contexts should be deleted. */ - override def deleteProjectMappingContexts(projectId: String): Unit = { - // delete mapping context definitions for the project - org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(mappingContextRepositoryFolderPath, projectId).toFile) - // remove project from the cache - mappingContextDefinitions.remove(projectId) - // delete project mapping contexts - projectFolderRepository.deleteMappingContext(projectId) + override def deleteAllMappingContexts(projectId: String): Unit = { + Future { + // delete mapping context definitions for the project + org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(mappingContextRepositoryFolderPath, projectId).toFile) + // remove project from the cache + mappingContextDefinitions.remove(projectId) + } flatMap { _ => + // delete project mapping contexts + projectRepository.deleteMappingContext(projectId) + } } /** * Update the mapping context header by its id - * @param projectId project id the mapping context belongs to - * @param id mapping context id - * @param headers mapping context headers + * + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id + * @param headers mapping context headers * @return */ - def updateMappingContextHeader(projectId: String, id: String, headers: Seq[CsvHeader]): Future[Unit] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + def updateMappingContextHeader(projectId: String, mappingContextId: String, headers: Seq[CsvHeader]): Future[Unit] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Write headers to the first row in the related file in the repository - getFileForMappingContext(projectId, id).map(file => { + getFileForMappingContext(projectId, mappingContextId).map(file => { CsvUtil.writeCsvHeaders(file, headers) }) } @@ -130,17 +122,17 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Save the mapping context content to the repository * - * @param projectId project id the mapping context belongs to - * @param id mapping context id - * @param content mapping context content to save + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id + * @param content mapping context content to save * @return */ - override def saveMappingContextContent(projectId: String, id: String, content: Source[ByteString, Any], pageNumber: Int, pageSize: Int): Future[Long] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def saveMappingContextContent(projectId: String, mappingContextId: String, content: Source[ByteString, Any], pageNumber: Int, pageSize: Int): Future[Long] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Write content to the related file in the repository - getFileForMappingContext(projectId, id).flatMap(file => { + getFileForMappingContext(projectId, mappingContextId).flatMap(file => { CsvUtil.writeCsvAndReturnRowNumber(file, content, pageNumber, pageSize) }) } @@ -148,16 +140,16 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Get the mapping context content by its id * - * @param projectId project id the mapping context belongs to - * @param id mapping context id + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id * @return */ - override def getMappingContextContent(projectId: String, id: String, pageNumber: Int, pageSize: Int): Future[(Source[ByteString, Any], Long)] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def getMappingContextContent(projectId: String, mappingContextId: String, pageNumber: Int, pageSize: Int): Future[(Source[ByteString, Any], Long)] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Read content from the related file in the repository - getFileForMappingContext(projectId, id).flatMap(file => { + getFileForMappingContext(projectId, mappingContextId).flatMap(file => { CsvUtil.getPaginatedCsvContent(file, pageNumber, pageSize) }) } @@ -165,17 +157,17 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Upload the mapping context content to the repository * - * @param projectId project id the mapping context belongs to - * @param id mapping context id - * @param content mapping context content to save + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id + * @param content mapping context content to save * @return */ - override def uploadMappingContext(projectId: String, id: String, content: Source[ByteString, Any]): Future[Unit] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def uploadMappingContext(projectId: String, mappingContextId: String, content: Source[ByteString, Any]): Future[Unit] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Write content to the related file in the repository - getFileForMappingContext(projectId, id).map(file => { + getFileForMappingContext(projectId, mappingContextId).map(file => { CsvUtil.saveFileContent(file, content) }) } @@ -183,16 +175,16 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Download the mapping context content by its id * - * @param projectId project id the mapping context belongs to - * @param id mapping context id + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id * @return */ - override def downloadMappingContext(projectId: String, id: String): Future[Source[ByteString, Any]] = { - if (!mappingContextExists(projectId, id)) { - throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $id does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") + override def downloadMappingContext(projectId: String, mappingContextId: String): Future[Source[ByteString, Any]] = { + if (!mappingContextExists(projectId, mappingContextId)) { + throw ResourceNotFound("Mapping context does not exists.", s"A mapping context with id $mappingContextId does not exists in the mapping context repository at ${FileUtils.getPath(mappingContextRepositoryFolderPath).toAbsolutePath.toString}") } // Read content from the related file in the repository - getFileForMappingContext(projectId, id).map(file => { + getFileForMappingContext(projectId, mappingContextId).map(file => { FileIO.fromPath(file.toPath) }) } @@ -200,12 +192,12 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Checks if the mapping context exists in the repository * - * @param projectId project id the mapping context belongs to - * @param id mapping context id + * @param projectId project id the mapping context belongs to + * @param mappingContextId mapping context id * @return */ - private def mappingContextExists(projectId: String, id: String): Boolean = { - mappingContextDefinitions.contains(projectId) && mappingContextDefinitions(projectId).contains(id) + private def mappingContextExists(projectId: String, mappingContextId: String): Boolean = { + mappingContextDefinitions.get(projectId).exists(_.contains(mappingContextId)) } /** @@ -215,7 +207,7 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, * @return */ private def getFileForMappingContext(projectId: String, fhirMappingContextId: String): Future[File] = { - val projectFuture: Future[Option[Project]] = projectFolderRepository.getProject(projectId) + val projectFuture: Future[Option[Project]] = projectRepository.getProject(projectId) projectFuture.map(project => { val file: File = FileUtils.getPath(mappingContextRepositoryFolderPath, project.get.id, fhirMappingContextId).toFile // If the project folder does not exist, create it @@ -229,6 +221,7 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Initializes the mapping context map from the repository * Map of (project id -> mapping context list) + * * @param mappingContextRepositoryFolderPath path to the mapping context repository * @return */ @@ -248,10 +241,20 @@ class MappingContextFolderRepository(mappingContextRepositoryFolderPath: String, /** * Reload the mapping context definitions from the given folder + * * @return */ - def reloadMappingContextDefinitions(): Unit = { + override def invalidate(): Unit = { this.mappingContextDefinitions.clear() initMap(mappingContextRepositoryFolderPath) } + + /** + * Retrieve the projects and MappingContextIds within. + * + * @return Map of projectId -> Seq[String] + */ + override def getProjectPairs: Map[String, Seq[String]] = { + mappingContextDefinitions.toMap + } } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectList.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectList.scala new file mode 100644 index 000000000..4707ee897 --- /dev/null +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectList.scala @@ -0,0 +1,16 @@ +package io.tofhir.server.repository.project + +/** + * Interface to handle the projects referred by an entity. + * This interface is intended to provide the necessary methods to repository implementations (e.g. JobFolderRepository) + * which relates back to Projects. + */ +trait IProjectList[T] { + + /** + * Retrieve the projects and entities within. + * @return Map of projectId -> Seq[T] where T is the entity (e.g. FhirMapping) + */ + def getProjectPairs: Map[String, Seq[T]] + +} diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectRepository.scala index b65f3c85c..3c0e0fa89 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/IProjectRepository.scala @@ -1,5 +1,7 @@ package io.tofhir.server.repository.project +import io.onfhir.definitions.common.model.SchemaDefinition +import io.tofhir.engine.model.{FhirMapping, FhirMappingJob} import io.tofhir.server.model.Project import org.json4s.JObject @@ -17,6 +19,8 @@ trait IProjectRepository { */ def getAllProjects: Future[Seq[Project]] + + /** * Save project to the repository. * @@ -34,7 +38,7 @@ trait IProjectRepository { def getProject(id: String): Future[Option[Project]] /** - * Update some fields of project in the repository. + * Update some fields of the project in the repository. * * @param id id of the project * @param patch patch to be applied to the project @@ -48,5 +52,102 @@ trait IProjectRepository { * @param id id of the project * @return */ - def removeProject(id: String): Future[Unit] + def deleteProject(id: String): Future[Unit] + + /** + * Methods to manage the reference from a Project to SchemaDefinitions + */ + + /** + * Add the schema definition to the project. + * + * @param projectId + * @param schema + */ + def addSchema(projectId: String, schema: SchemaDefinition): Future[Project] + + /** + * Replace the schema definition of the project. + * + * @param projectId + * @param schemaMetadata + */ + def updateSchema(projectId: String, schemaMetadata: SchemaDefinition): Future[Project] + + /** + * Remove the schema definition from the project + * + * @param projectId + * @param schemaId The unique identifier of the schema to be deleted. If not provided, all schemas will be removed from the project definition. + */ + def deleteSchema(projectId: String, schemaId: Option[String] = None): Future[Unit] + + /** + * Methods to manage the reference from a Project to SchemaDefinitions + */ + + /** + * Add the mapping definition to the project + * + * @param projectId Project id to which the mapping will be added to + * @param mapping Mapping to be added + */ + def addMapping(projectId: String, mapping: FhirMapping): Future[Project] + + /** + * Replaces the mapping definition within the project + * + * @param projectId Project id of which the mapping will be updated + * @param mapping Mapping to be updated + */ + def updateMapping(projectId: String, mapping: FhirMapping): Future[Project] + + /** + * Deletes the mapping definition from the + * + * @param projectId Project id from which the mapping will be deleted + * @param mappingId The unique identifier of the mapping to be deleted. If not provided, all mappings for the project will be deleted. + */ + def deleteMapping(projectId: String, mappingId: Option[String] = None): Future[Unit] + + /** + * Adds the job definition to the project + * + * @param projectId + * @param job + */ + def addJob(projectId: String, job: FhirMappingJob): Future[Project] + + /** + * Replaces the job definition within the project + * + * @param projectId + * @param job + */ + def updateJob(projectId: String, job: FhirMappingJob): Future[Project] + + /** + * Deletes the job definition of the project + * + * @param projectId + * @param jobId The unique identifier of the job to be deleted. If not provided, all jobs for the project will be deleted. + */ + def deleteJob(projectId: String, jobId: Option[String] = None): Future[Unit] + + /** + * Adds the mapping context id to the project + * + * @param projectId Project id the mapping context will be added to + * @param mappingContext Mapping context id to be added + */ + def addMappingContext(projectId: String, mappingContext: String): Future[Project] + + /** + * Deletes the mapping context from the project + * + * @param projectId Project id the mapping context will be deleted + * @param mappingContextId The unique identifier of the mapping context to be deleted. If not provided, all mapping contexts for the project will be deleted. + */ + def deleteMappingContext(projectId: String, mappingContextId: Option[String] = None): Future[Unit] + } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/project/ProjectFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/ProjectFolderRepository.scala index 2b87bf67c..a16be8f24 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/project/ProjectFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/project/ProjectFolderRepository.scala @@ -27,15 +27,17 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi private val logger: Logger = Logger(this.getClass) // Project cache keeping the up-to-date list of projects - private var projects: mutable.Map[String, Project] = mutable.Map.empty + private val projects: mutable.Map[String, Project] = mutable.Map.empty /** - * Initializes the projects cache + * Initializes the projects cache with the given Map of projects and persist to the file (projects.json) * * @param projects */ - def setProjects(projects: mutable.Map[String, Project]): Unit = { - this.projects = projects + def setProjects(vProjects: Map[String, Project]): Unit = { + this.projects.clear() + this.projects ++= vProjects + this.updateProjectsMetadata() } /** @@ -111,21 +113,20 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi updatedProject } } + /** * Delete the project from the repository. * * @param id id of the project * @return */ - override def removeProject(id: String): Future[Unit] = { + override def deleteProject(id: String): Future[Unit] = { Future { // validate that the project exists if (!projects.contains(id)) throw ResourceNotFound("Project does not exist.", s"Project $id not found") - // remove the project from the cache projects.remove(id) - // update projects metadata with the remaining ones updateProjectsMetadata() } @@ -137,11 +138,15 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi * @param projectId * @param schema */ - def addSchema(projectId: String, schema: SchemaDefinition): Unit = { - val project: Project = projects(projectId) - projects.put(projectId, project.copy(schemas = project.schemas :+ schema)) - updateProjectsMetadata() + override def addSchema(projectId: String, schema: SchemaDefinition): Future[Project] = { + Future { + val project: Project = projects(projectId) + val updatedProject = project.copy(schemas = project.schemas :+ schema) + projects.put(projectId, updatedProject) + updateProjectsMetadata() + updatedProject + } } /** @@ -150,88 +155,100 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi * @param projectId * @param schemaMetadata */ - def updateSchema(projectId: String, schemaMetadata: SchemaDefinition): Unit = { - deleteSchema(projectId, Some(schemaMetadata.id)) - addSchema(projectId, schemaMetadata) + override def updateSchema(projectId: String, schemaMetadata: SchemaDefinition): Future[Project] = { + deleteSchema(projectId, Some(schemaMetadata.id)) flatMap { _ => + addSchema(projectId, schemaMetadata) + } } /** - * Deletes the schema definition of the project + * Deletes the schema definition (or all schema definitions) from the project. * * @param projectId - * @param schemaId The unique identifier of the schema to be deleted. If not provided, all schemas for the project will be deleted. + * @param schemaId The unique identifier of the schema to be deleted. If not provided, all schemas will be removed from the project definition. */ - def deleteSchema(projectId: String, schemaId: Option[String] = None): Unit = { - val project: Option[Project] = projects.get(projectId) - project match { - case Some(project) => - schemaId match { - case Some(id) => - projects.put(projectId, project.copy(schemas = project.schemas.filterNot(s => s.id.equals(id)))) - case None => - projects.put(projectId, project.copy(schemas = Seq.empty)) - } - updateProjectsMetadata() - case None => // Do nothing if project is deleted before the schema + override def deleteSchema(projectId: String, schemaId: Option[String] = None): Future[Unit] = { + Future { + val project: Option[Project] = projects.get(projectId) + project match { + case Some(project) => + schemaId match { + case Some(id) => + projects.put(projectId, project.copy(schemas = project.schemas.filterNot(s => s.id.equals(id)))) + case None => + projects.put(projectId, project.copy(schemas = Seq.empty)) + } + updateProjectsMetadata() + case None => // Do nothing if project is deleted before the schema + } } } /** - * Adds the mapping definition to the project json file + * Add the mapping definition to the project * - * @param projectId Project id the mapping will be added to - * @param mapping Mapping to be added + * @param projectId Project id to which the mapping will be added to + * @param mapping Mapping to be added */ - def addMapping(projectId: String, mapping: FhirMapping): Unit = { - val project: Project = projects(projectId) - projects.put(projectId, project.copy(mappings = project.mappings :+ mapping)) - - updateProjectsMetadata() + override def addMapping(projectId: String, mapping: FhirMapping): Future[Project] = { + Future { + val project: Project = projects(projectId) + val updatedProject = project.copy(mappings = project.mappings :+ mapping) + projects.put(projectId, updatedProject) + updateProjectsMetadata() + updatedProject + } } /** - * Replaces the mapping definition in the project json file + * Replaces the mapping definition within the project * - * @param projectId Project id the mapping will be updated to - * @param mapping Mapping to be updated + * @param projectId Project id of which the mapping will be updated + * @param mapping Mapping to be updated */ - def updateMapping(projectId: String, mapping: FhirMapping): Unit = { - deleteMapping(projectId, Some(mapping.id)) - addMapping(projectId, mapping) + override def updateMapping(projectId: String, mapping: FhirMapping): Future[Project] = { + deleteMapping(projectId, Some(mapping.id)) flatMap { _ => + addMapping(projectId, mapping) + } } /** - * Deletes the mapping definition from the project json file + * Deletes the mapping definition from the * - * @param projectId Project id the mapping will be deleted from + * @param projectId Project id from which the mapping will be deleted * @param mappingId The unique identifier of the mapping to be deleted. If not provided, all mappings for the project will be deleted. */ - def deleteMapping(projectId: String, mappingId: Option[String] = None): Unit = { - val project: Option[Project] = projects.get(projectId) - project match { - case Some(project) => - mappingId match { - case Some(id) => - projects.put(projectId, project.copy(mappings = project.mappings.filterNot(m => m.id.equals(id)))) - case None => - projects.put(projectId, project.copy(mappings = Seq.empty)) - } - updateProjectsMetadata() - case None => // Do nothing if project is deleted before the mapping + override def deleteMapping(projectId: String, mappingId: Option[String] = None): Future[Unit] = { + Future { + val project: Option[Project] = projects.get(projectId) + project match { + case Some(project) => + mappingId match { + case Some(id) => + projects.put(projectId, project.copy(mappings = project.mappings.filterNot(m => m.id.equals(id)))) + case None => + projects.put(projectId, project.copy(mappings = Seq.empty)) + } + updateProjectsMetadata() + case None => // Do nothing if project is deleted before the mapping + } } } /** - * Adds the job definition to the project json file + * Adds the job definition to the project * * @param projectId * @param job */ - def addJob(projectId: String, job: FhirMappingJob): Unit = { - val project: Project = projects(projectId) - projects.put(projectId, project.copy(mappingJobs = project.mappingJobs :+ job)) - - updateProjectsMetadata() + override def addJob(projectId: String, job: FhirMappingJob): Future[Project] = { + Future { + val project: Project = projects(projectId) + val updatedProject = project.copy(mappingJobs = project.mappingJobs :+ job) + projects.put(projectId, updatedProject) + updateProjectsMetadata() + updatedProject + } } /** @@ -240,9 +257,10 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi * @param projectId * @param job */ - def updateJob(projectId: String, job: FhirMappingJob): Unit = { - deleteJob(projectId, Some(job.id)) - addJob(projectId, job) + override def updateJob(projectId: String, job: FhirMappingJob): Future[Project] = { + deleteJob(projectId, Some(job.id)) flatMap { _ => + addJob(projectId, job) + } } /** @@ -251,57 +269,66 @@ class ProjectFolderRepository(config: ToFhirEngineConfig) extends IProjectReposi * @param projectId * @param jobId The unique identifier of the job to be deleted. If not provided, all jobs for the project will be deleted. */ - def deleteJob(projectId: String, jobId: Option[String] = None): Unit = { - val project: Option[Project] = projects.get(projectId) - project match { - case Some(project) => - jobId match { - case Some(id) => - projects.put(projectId, project.copy(mappingJobs = project.mappingJobs.filterNot(j => j.id.equals(id)))) - case None => - projects.put(projectId, project.copy(mappingJobs = Seq.empty)) - } - updateProjectsMetadata() - case None => // Do nothing if project is deleted before the job + override def deleteJob(projectId: String, jobId: Option[String] = None): Future[Unit] = { + Future { + val project: Option[Project] = projects.get(projectId) + project match { + case Some(project) => + jobId match { + case Some(id) => + projects.put(projectId, project.copy(mappingJobs = project.mappingJobs.filterNot(j => j.id.equals(id)))) + case None => + projects.put(projectId, project.copy(mappingJobs = Seq.empty)) + } + updateProjectsMetadata() + case None => // Do nothing if project is deleted before the job + } } } /** * Adds the mapping context id to the project json file - * @param projectId Project id the mapping context will be added to + * + * @param projectId Project id the mapping context will be added to * @param mappingContext Mapping context id to be added */ - def addMappingContext(projectId: String, mappingContext: String): Unit = { - val project: Project = projects(projectId) - projects.put(projectId, project.copy(mappingContexts = project.mappingContexts :+ mappingContext)) - - updateProjectsMetadata() + override def addMappingContext(projectId: String, mappingContext: String): Future[Project] = { + Future { + val project: Project = projects(projectId) + val updatedProject = project.copy(mappingContexts = project.mappingContexts :+ mappingContext) + projects.put(projectId, updatedProject) + updateProjectsMetadata() + updatedProject + } } /** * Deletes the mapping context in the project json file - * @param projectId Project id the mapping context will be deleted + * + * @param projectId Project id the mapping context will be deleted * @param mappingContextId The unique identifier of the mapping context to be deleted. If not provided, all mapping contexts for the project will be deleted. */ - def deleteMappingContext(projectId: String, mappingContextId: Option[String] = None): Unit = { - val project: Option[Project] = projects.get(projectId) - project match { - case Some(project) => - mappingContextId match { - case Some(id) => - projects.put(projectId, project.copy(mappingContexts = project.mappingContexts.filterNot(m => m.equals(id)))) - case None => - projects.put(projectId, project.copy(mappingContexts = Seq.empty)) - } - updateProjectsMetadata() - case None => // Do nothing if project is deleted before the mapping context + override def deleteMappingContext(projectId: String, mappingContextId: Option[String] = None): Future[Unit] = { + Future { + val project: Option[Project] = projects.get(projectId) + project match { + case Some(project) => + mappingContextId match { + case Some(id) => + projects.put(projectId, project.copy(mappingContexts = project.mappingContexts.filterNot(m => m.equals(id)))) + case None => + projects.put(projectId, project.copy(mappingContexts = Seq.empty)) + } + updateProjectsMetadata() + case None => // Do nothing if project is deleted before the mapping context + } } } /** * Updates the projects metadata with project included in the cache. */ - def updateProjectsMetadata() = { + private def updateProjectsMetadata(): Unit = { val file = FileUtils.getPath(ProjectFolderRepository.PROJECTS_JSON).toFile // when projects metadata file does not exist, create it if (!file.exists()) { diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/ISchemaRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/ISchemaRepository.scala index 3869bf6c7..66d883fc0 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/ISchemaRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/ISchemaRepository.scala @@ -3,6 +3,8 @@ package io.tofhir.server.repository.schema import io.onfhir.api.Resource import io.onfhir.definitions.common.model.SchemaDefinition import io.tofhir.engine.mapping.schema.IFhirSchemaLoader +import io.tofhir.engine.repository.ICachedRepository +import io.tofhir.server.repository.project.IProjectList import scala.concurrent.Future @@ -10,7 +12,7 @@ import scala.concurrent.Future * Interface to save and load SchemaDefinitions * so that the client applications can manage the schemas through CRUD operations */ -trait ISchemaRepository extends IFhirSchemaLoader { +trait ISchemaRepository extends IFhirSchemaLoader with ICachedRepository with IProjectList[SchemaDefinition] { /** * Retrieve the metadata of all SchemaDefinitions (only id, url, type and name fields are populated) @@ -69,7 +71,7 @@ trait ISchemaRepository extends IFhirSchemaLoader { * * @param projectId The unique identifier of the project for which schemas should be deleted. */ - def deleteProjectSchemas(projectId: String): Unit + def deleteAllSchemas(projectId: String): Future[Unit] /** * Retrieve the Structure Definition of the schema identified by its id. diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/SchemaFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/SchemaFolderRepository.scala index 2a2dd7803..3cc0015ee 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/SchemaFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/schema/SchemaFolderRepository.scala @@ -6,11 +6,11 @@ import io.onfhir.api.util.IOUtil import io.onfhir.api.validation.ProfileRestrictions import io.onfhir.api.{FHIR_FOUNDATION_RESOURCES, FHIR_ROOT_URL_FOR_DEFINITIONS, Resource} import io.onfhir.config.{BaseFhirConfig, FSConfigReader, IFhirConfigReader} +import io.onfhir.definitions.common.model.SchemaDefinition +import io.onfhir.definitions.common.util.HashUtil import io.onfhir.definitions.resource.service.SimpleStructureDefinitionService import io.onfhir.exception.InitializationException import io.onfhir.util.JsonFormatter._ -import io.onfhir.definitions.common.model.SchemaDefinition -import io.onfhir.definitions.common.util.HashUtil import io.tofhir.common.util.SchemaUtil import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.engine.config.ToFhirConfig @@ -19,7 +19,7 @@ import io.tofhir.engine.model.exception.FhirMappingException import io.tofhir.engine.util.FileUtils.FileExtensions import io.tofhir.engine.util.{FhirVersionUtil, FileUtils} import io.tofhir.server.common.model.{AlreadyExists, BadRequest, ResourceNotFound} -import io.tofhir.server.repository.project.ProjectFolderRepository +import io.tofhir.server.repository.project.IProjectRepository import io.tofhir.server.util.FileOperations import org.apache.spark.sql.types.StructType @@ -34,9 +34,9 @@ import scala.language.postfixOps * Folder/Directory based schema repository implementation. * * @param schemaRepositoryFolderPath - * @param projectFolderRepository + * @param projectRepository */ -class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRepository: ProjectFolderRepository) extends AbstractSchemaRepository { +class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectRepository: IProjectRepository) extends AbstractSchemaRepository { private val logger: Logger = Logger(this.getClass) @@ -46,20 +46,12 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe // BaseFhirConfig will act as a validator for the schema definitions by holding the ProfileDefinitions in memory private var baseFhirConfig: BaseFhirConfig = initBaseFhirConfig(fhirConfigReader) private val simpleStructureDefinitionService = new SimpleStructureDefinitionService(baseFhirConfig) + // Schema definition cache: project id -> schema id -> schema definition private val schemaDefinitions: mutable.Map[String, mutable.Map[String, SchemaDefinition]] = mutable.Map.empty[String, mutable.Map[String, SchemaDefinition]] // Initialize the map for the first time initMap(schemaRepositoryFolderPath) - /** - * Returns the schema cached schema definitions by this repository - * - * @return - */ - def getCachedSchemas(): mutable.Map[String, mutable.Map[String, SchemaDefinition]] = { - schemaDefinitions - } - /** * Retrieve all SchemaDefinitions * @@ -67,11 +59,9 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe */ override def getAllSchemas(projectId: String): Future[Seq[SchemaDefinition]] = { Future { - if (schemaDefinitions.contains(projectId)) { - schemaDefinitions(projectId).values.toSeq.sortWith(schemaComparisonFunc) - } else { - Seq.empty - } + schemaDefinitions.get(projectId) + .map(_.values.toSeq) // If such a project exists, return the schema definitions as a sequence + .getOrElse(Seq.empty[SchemaDefinition]) // Else, return an empty list } } @@ -161,7 +151,7 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe } // Update the file - getFileForSchema(projectId, schemaDefinition.id).map(file => { + getFileForSchema(projectId, schemaDefinition.id).flatMap(file => { val fw = new FileWriter(file) fw.write(structureDefinitionResource.toPrettyJson) fw.close() @@ -169,12 +159,12 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe // Update cache baseFhirConfig.profileRestrictions = SchemaManagementUtil.updateProfileRestrictionsMap(baseFhirConfig.profileRestrictions, schemaDefinition.url, schemaDefinition.version, fhirFoundationResourceParser.parseStructureDefinition(structureDefinitionResource, includeElementMetadata = true)) - schemaDefinitions(projectId).put(schemaId, schemaDefinition) // Update the project - projectFolderRepository.updateSchema(projectId, schemaDefinition) - schemaDefinition + projectRepository.updateSchema(projectId, schemaDefinition) map { _ => + schemaDefinition + } }) } @@ -191,7 +181,7 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe } // delete schema file from repository - getFileForSchema(projectId, schemaId).map(file => { + getFileForSchema(projectId, schemaId).flatMap(file => { file.delete() val schema: SchemaDefinition = schemaDefinitions(projectId)(schemaId) // delete the schema from the in-memory map @@ -199,7 +189,7 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe // remove the url of the schema baseFhirConfig.profileRestrictions -= schema.url // Update project - projectFolderRepository.deleteSchema(projectId, Some(schemaId)) + projectRepository.deleteSchema(projectId, Some(schemaId)) }) } @@ -208,18 +198,21 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe * * @param projectId The unique identifier of the project for which schemas should be deleted. */ - override def deleteProjectSchemas(projectId: String): Unit = { - // delete schema definitions for the project - org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(schemaRepositoryFolderPath, projectId).toFile) - // remove profile restrictions of project schemas - val schemaUrls: Set[String] = schemaDefinitions.getOrElse(projectId, mutable.Map.empty) - .values.map(definition => definition.url) - .toSet - baseFhirConfig.profileRestrictions --= schemaUrls - // remove project from the cache - schemaDefinitions.remove(projectId) - // delete project schemas - projectFolderRepository.deleteSchema(projectId) + override def deleteAllSchemas(projectId: String): Future[Unit] = { + Future { + // delete schema definitions for the project + org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath(schemaRepositoryFolderPath, projectId).toFile) + // remove profile restrictions of project schemas + val schemaUrls: Set[String] = schemaDefinitions.getOrElse(projectId, mutable.Map.empty) + .values.map(definition => definition.url) + .toSet + baseFhirConfig.profileRestrictions --= schemaUrls + // remove project from the cache + schemaDefinitions.remove(projectId) + } flatMap { _ => + // delete project schemas + projectRepository.deleteSchema(projectId) + } } /** @@ -230,7 +223,7 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe * @return */ private def getFileForSchema(projectId: String, schemaId: String): Future[File] = { - projectFolderRepository.getProject(projectId) map { project => + projectRepository.getProject(projectId) map { project => if (project.isEmpty) throw new IllegalStateException(s"This should not be possible. ProjectId: $projectId does not exist in the project folder repository.") FileOperations.getFileForEntityWithinProject(schemaRepositoryFolderPath, project.get.id, schemaId) } @@ -280,7 +273,7 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe System.exit(1) } } - if(projectSchemas.isEmpty) { + if (projectSchemas.isEmpty) { // No processable schema files under projectFolder logger.warn(s"There are no processable schema files under ${projectFolder.getAbsolutePath}. Skipping ${projectFolder.getName}.") } else { @@ -419,24 +412,32 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe * Throws: * {@link AlreadyExists} with the code 409 if the schema ID is not unique in the project or the schema URL is not unique * - * @param projectId Identifier of the project to check schema ID's in it - * @param schemaId Identifier of the schema - * @param schemaCanonicalUrl Canonical Url of the schema with its version - * @param schemaName Name of the schema. If provided its uniqueness is checked within the given projectId. + * @param projectId Identifier of the project to check schema ID's in it + * @param schemaId Identifier of the schema + * @param schemaCanonicalUrl Canonical Url of the schema with its version + * @param schemaName Name of the schema. If provided its uniqueness is checked within the given projectId. */ private def checkIfSchemaIsUnique(projectId: String, schemaId: String, schemaUrl: String, schemaName: Option[String]): Unit = { - if (schemaDefinitions.contains(projectId) && schemaDefinitions(projectId).contains(schemaId)) { + // Check the uniqueness of the schemaId + schemaDefinitions.get(projectId).flatMap(_.get(schemaId)).foreach { _ => throw AlreadyExists("Schema already exists.", s"A schema definition with id $schemaId already exists in the schema repository at ${FileUtils.getPath(schemaRepositoryFolderPath).toAbsolutePath.toString}") } + + // Collect the Canonical URLs of all schema definitions and check the uniqueness of the schemaUrl val schemaCanonicalUrls: Map[String, String] = schemaDefinitions.values.flatMap(_.values).map(schema => s"${schema.url}|${schema.version}" -> schema.name).toMap - if (schemaCanonicalUrls.contains(schemaUrl)) { - throw AlreadyExists("Schema already exists.", s"A schema definition with url $schemaUrl already exists. Check the schema '${schemaCanonicalUrls(schemaUrl)}'") + schemaCanonicalUrls.get(schemaUrl).foreach { schemaName => + throw AlreadyExists("Schema already exists.", s"A schema definition with url $schemaUrl already exists. Check the schema '$schemaName'") } - if (schemaName.isDefined) { - if (schemaDefinitions.contains(projectId) && schemaDefinitions(projectId).values.exists(_.name == schemaName.get)) { - throw AlreadyExists("Schema already exists.", s"A schema definition with name ${schemaName.get} already exists in the schema repository at ${FileUtils.getPath(schemaRepositoryFolderPath).toAbsolutePath.toString}") + + // If a schemaName exists, check the uniqueness of the schemaName + schemaName.foreach { name => + schemaDefinitions.get(projectId).foreach { schemas => + if (schemas.values.exists(_.name == name)) { + throw AlreadyExists("Schema already exists.", s"A schema definition with name $name already exists in the schema repository at ${FileUtils.getPath(schemaRepositoryFolderPath).toAbsolutePath.toString}") + } } } + } /** @@ -448,20 +449,20 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe * @return */ private def writeSchemaAndUpdateCaches(projectId: String, structureDefinitionResource: Resource, schemaDefinition: SchemaDefinition): Future[SchemaDefinition] = { - getFileForSchema(projectId, schemaDefinition.id).map(newFile => { + getFileForSchema(projectId, schemaDefinition.id).flatMap(newFile => { val fw = new FileWriter(newFile) fw.write(structureDefinitionResource.toPrettyJson) fw.close() - // Update the project with the schema - projectFolderRepository.addSchema(projectId, schemaDefinition) - // Update the caches with the new schema baseFhirConfig.profileRestrictions = SchemaManagementUtil.updateProfileRestrictionsMap(baseFhirConfig.profileRestrictions, schemaDefinition.url, schemaDefinition.version, fhirFoundationResourceParser.parseStructureDefinition(structureDefinitionResource, includeElementMetadata = true)) schemaDefinitions.getOrElseUpdate(projectId, mutable.Map.empty).put(schemaDefinition.id, schemaDefinition) - schemaDefinition + // Update the project with the schema + projectRepository.addSchema(projectId, schemaDefinition) map { _ => + schemaDefinition + } }) } @@ -524,12 +525,24 @@ class SchemaFolderRepository(schemaRepositoryFolderPath: String, projectFolderRe /** * Reload the schema definitions from the given folder + * * @return */ - def reloadSchemaDefinitions(): Unit = { + override def invalidate(): Unit = { this.schemaDefinitions.clear() this.baseFhirConfig = initBaseFhirConfig(fhirConfigReader) initMap(schemaRepositoryFolderPath) } + + /** + * Retrieve the projects and SchemaDefinitions within. + * + * @return Map of projectId -> Seq[SchemaDefinition] + */ + override def getProjectPairs: Map[String, Seq[SchemaDefinition]] = { + schemaDefinitions.map { case (projectId, schemaPairs) => + projectId -> schemaPairs.values.toSeq + }.toMap + } } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/ITerminologySystemRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/ITerminologySystemRepository.scala index fd82833ac..37e9d2fe1 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/ITerminologySystemRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/ITerminologySystemRepository.scala @@ -1,5 +1,6 @@ package io.tofhir.server.repository.terminology +import io.tofhir.engine.repository.ICachedRepository import io.tofhir.server.model.TerminologySystem import scala.concurrent.Future @@ -8,7 +9,7 @@ import scala.concurrent.Future * Interface to save and load TerminologySystem so that the client applications can manage the TerminologySystem * through CRUD operations */ -trait ITerminologySystemRepository { +trait ITerminologySystemRepository extends ICachedRepository{ /** * Retrieve the metadata of all TerminologySystems diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/TerminologySystemFolderRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/TerminologySystemFolderRepository.scala index 47661ba1a..7c89edb87 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/TerminologySystemFolderRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/TerminologySystemFolderRepository.scala @@ -1,6 +1,7 @@ package io.tofhir.server.repository.terminology import io.tofhir.engine.Execution.actorSystem.dispatcher +import io.tofhir.engine.repository.ICachedRepository import io.tofhir.engine.util.FileUtils import io.tofhir.server.common.model.{AlreadyExists, BadRequest, ResourceNotFound} import io.tofhir.server.model.TerminologySystem @@ -14,7 +15,7 @@ import scala.concurrent.Future /** * Folder/Directory based terminology system repository implementation. */ -class TerminologySystemFolderRepository(terminologySystemsFolderPath: String) extends ITerminologySystemRepository { +class TerminologySystemFolderRepository(terminologySystemsFolderPath: String) extends ITerminologySystemRepository with ICachedRepository { // terminology system id -> TerminologySystem private val terminologySystemMap: mutable.Map[String, TerminologySystem] = mutable.Map.empty[String, TerminologySystem] @@ -72,7 +73,7 @@ class TerminologySystemFolderRepository(terminologySystemsFolderPath: String) ex /** * Update a TerminologySystem * - * @param id id of the TerminologySystem + * @param id id of the TerminologySystem * @param terminologySystem TerminologySystem to update * @return updated TerminologySystem */ @@ -92,7 +93,7 @@ class TerminologySystemFolderRepository(terminologySystemsFolderPath: String) ex }.toSeq this.updateTerminologySystemsDBFile(updatedLocalTerminologies) // update concept maps/code systems files - this.updateConceptMapAndCodeSystemFiles(foundTerminology, terminologySystem) map(_ => { + this.updateConceptMapAndCodeSystemFiles(foundTerminology, terminologySystem) map (_ => { // update the terminology service in the map this.terminologySystemMap.put(id, terminologySystem) terminologySystem @@ -254,9 +255,10 @@ class TerminologySystemFolderRepository(terminologySystemsFolderPath: String) ex /** * Reload the terminology systems from the given folder + * * @return */ - def reloadTerminologySystems(): Unit = { + def invalidate(): Unit = { this.terminologySystemMap.clear() initMap() } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemFolderRepository.scala similarity index 99% rename from tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemRepository.scala rename to tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemFolderRepository.scala index 8632af97e..2a6c370bb 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/codesystem/CodeSystemFolderRepository.scala @@ -16,7 +16,7 @@ import org.json4s.jackson.Serialization.writePretty import java.io.FileWriter import scala.concurrent.Future -class CodeSystemRepository(terminologySystemFolderPath: String) extends ICodeSystemRepository { +class CodeSystemFolderRepository(terminologySystemFolderPath: String) extends ICodeSystemRepository { /** * Retrieve the code system within a terminology * diff --git a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapRepository.scala b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapFolderRepository.scala similarity index 99% rename from tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapRepository.scala rename to tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapFolderRepository.scala index 14d3b121f..c8604da92 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapRepository.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/repository/terminology/conceptmap/ConceptMapFolderRepository.scala @@ -17,7 +17,7 @@ import org.json4s.jackson.Serialization.writePretty import java.io.{File, FileWriter} import scala.concurrent.Future -class ConceptMapRepository(terminologySystemFolderPath: String) extends IConceptMapRepository { +class ConceptMapFolderRepository(terminologySystemFolderPath: String) extends IConceptMapRepository { /** * Retrieve the concept maps within a terminology diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 7997ee7b8..fc221d217 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -5,12 +5,13 @@ import io.onfhir.path.IFhirPathFunctionLibraryFactory import io.tofhir.common.util.CustomMappingFunctionsFactory import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.mapping.context.MappingContextLoader -import io.tofhir.engine.mapping.job.FhirMappingJobManager +import io.tofhir.engine.mapping.job.{FhirMappingJobManager, MappingJobScheduler} import io.tofhir.engine.model._ import io.tofhir.engine.util.FhirMappingJobFormatter.formats import io.tofhir.engine.util.FileUtils import io.tofhir.engine.util.FileUtils.FileExtensions import io.tofhir.engine.{Execution, ToFhirEngine} +import io.tofhir.engine.env.EnvironmentVariableResolver import io.tofhir.rxnorm.RxNormApiFunctionLibraryFactory import io.tofhir.server.common.model.{BadRequest, ResourceNotFound} import io.tofhir.server.model.{ExecuteJobTask, TestResourceCreationRequest} @@ -18,7 +19,6 @@ import io.tofhir.server.repository.job.IJobRepository import io.tofhir.server.repository.mapping.IMappingRepository import io.tofhir.server.repository.schema.ISchemaRepository import io.tofhir.server.util.DataFrameUtil -import io.tofhir.engine.mapping.job.MappingJobScheduler import org.apache.commons.io import org.apache.spark.sql.KeyValueGroupedDataset import org.json4s.JsonAST.{JBool, JObject, JValue} @@ -59,135 +59,134 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin * @return */ def runJob(projectId: String, jobId: String, executionId: Option[String], executeJobTask: Option[ExecuteJobTask]): Future[Unit] = { - if (!jobRepository.getCachedMappingsJobs.contains(projectId) || !jobRepository.getCachedMappingsJobs(projectId).contains(jobId)) { - throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository") - } - - val mappingJob: FhirMappingJob = jobRepository.getCachedMappingsJobs(projectId)(jobId) - - // get the list of mapping task to be executed - val mappingTasks = executeJobTask.flatMap(_.mappingTaskNames) match { - case Some(names) => names.flatMap(name => mappingJob.mappings.find(p => p.name.contentEquals(name))) - case None => mappingJob.mappings - } - - if (mappingTasks.isEmpty) { - throw BadRequest("No mapping task to execute!", "No mapping task to execute!") - } - - // all the mappings in mappingTasks should not running, if any of them is running, give already running response - val JobExecutionMap = toFhirEngine.runningJobRegistry.getRunningExecutions() - val jobExecution = JobExecutionMap.get(jobId) - if (jobExecution.isDefined) { - val runningMappingTaskNames = jobExecution.get.flatMap(_._2) - val runningMappingTaskNamesSet = runningMappingTaskNames.toSet - val mappingTaskNamesSet = mappingTasks.map(_.name).toSet - val intersection = runningMappingTaskNamesSet.intersect(mappingTaskNamesSet) - if (intersection.nonEmpty) { - // create jvalue json response with already running mappingTask names as list string in values and execution ids in keys - // find execution ids for the intersection mappingTask names - val executionIds = jobExecution.get - .filter(p => p._2.exists(name => intersection.contains(name))) - .map(x => { - (x._1, x._2.filter(name => intersection.contains(name))) - }) - // convert execution ids to json response - val jValueResponse = JArray(executionIds.map(x => { - JObject( - "executionId" -> JString(x._1), - "mappingTaskNames" -> JArray(x._2.map(JString(_)).toList) - ) - }).toList) - // use it in the response message - throw BadRequest("Mapping tasks are already running!", JsonMethods.compact(JsonMethods.render(jValueResponse))) - } - } - - // create an instance of MappingJobScheduler - val mappingJobScheduler: MappingJobScheduler = MappingJobScheduler.instance(ToFhirConfig.engineConfig.toFhirDbFolderPath) - - // create execution - val mappingJobExecution = FhirMappingJobExecution(executionId.getOrElse(UUID.randomUUID().toString), job = mappingJob, - projectId = projectId, mappingTasks = mappingTasks) - val fhirMappingJobManager = new FhirMappingJobManager( - toFhirEngine.mappingRepo, - toFhirEngine.contextLoader, - toFhirEngine.schemaLoader, - toFhirEngine.functionLibraries, - toFhirEngine.sparkSession, - Some(mappingJobScheduler) - ) - - // Streaming jobs - val submittedJob = Future { - if (mappingJob.sourceSettings.exists(_._2.asStream)) { - // Delete checkpoint directory if set accordingly - if (executeJobTask.exists(_.clearCheckpoints)) { - mappingTasks.foreach(mappingTask => { - // Reset the archiving offset so that the archiving starts from scratch - toFhirEngine.fileStreamInputArchiver.resetOffset(mappingJobExecution, mappingTask.name) - - val checkpointDirectory: File = new File(mappingJobExecution.getCheckpointDirectory(mappingTask.name)) - io.FileUtils.deleteDirectory(checkpointDirectory) - logger.debug(s"Deleted checkpoint directory for jobId: ${mappingJobExecution.jobId}, executionId: ${mappingJobExecution.id}, mappingTaskName: ${mappingTask.name}, path: ${checkpointDirectory.getAbsolutePath}") - }) + jobRepository.getJob(projectId, jobId) map { + case None => throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository") + case Some(mj) => + val mappingJob = EnvironmentVariableResolver.resolveFhirMappingJob(mj) + + // get the list of mapping task to be executed + val mappingTasks = executeJobTask.flatMap(_.mappingTaskNames) match { + case Some(names) => names.flatMap(name => mappingJob.mappings.find(p => p.name.contentEquals(name))) + case None => mappingJob.mappings } - fhirMappingJobManager - .startMappingJobStream( - mappingJobExecution, - sourceSettings = mappingJob.sourceSettings, - sinkSettings = mappingJob.sinkSettings, - terminologyServiceSettings = mappingJob.terminologyServiceSettings, - identityServiceSettings = mappingJob.getIdentityServiceSettings() - ) - .foreach(sq => toFhirEngine.runningJobRegistry.registerStreamingQuery(mappingJobExecution, sq._1, sq._2)) - } + if (mappingTasks.isEmpty) { + throw BadRequest("No mapping task to execute!", "No mapping task to execute!") + } - // Batch jobs - else { - // run the mapping job with scheduler - if (mappingJob.schedulingSettings.nonEmpty) { - // check whether the job execution is already scheduled - if (executionId.nonEmpty && toFhirEngine.runningJobRegistry.isScheduled(mappingJob.id, executionId.get)) { - throw BadRequest("The mapping job execution is already scheduled!", s"The mapping job execution is already scheduled!") + // all the mappings in mappingTasks should not running, if any of them is running, give already running response + val JobExecutionMap = toFhirEngine.runningJobRegistry.getRunningExecutions() + val jobExecution = JobExecutionMap.get(jobId) + if (jobExecution.isDefined) { + val runningMappingTaskNames = jobExecution.get.flatMap(_._2) + val runningMappingTaskNamesSet = runningMappingTaskNames.toSet + val mappingTaskNamesSet = mappingTasks.map(_.name).toSet + val intersection = runningMappingTaskNamesSet.intersect(mappingTaskNamesSet) + if (intersection.nonEmpty) { + // create jvalue json response with already running mappingTask names as list string in values and execution ids in keys + // find execution ids for the intersection mappingTask names + val executionIds = jobExecution.get + .filter(p => p._2.exists(name => intersection.contains(name))) + .map(x => { + (x._1, x._2.filter(name => intersection.contains(name))) + }) + // convert execution ids to json response + val jValueResponse = JArray(executionIds.map(x => { + JObject( + "executionId" -> JString(x._1), + "mappingTaskNames" -> JArray(x._2.map(JString(_)).toList) + ) + }).toList) + // use it in the response message + throw BadRequest("Mapping tasks are already running!", JsonMethods.compact(JsonMethods.render(jValueResponse))) } - // schedule the mapping job - fhirMappingJobManager - .scheduleMappingJob( - mappingJobExecution = mappingJobExecution, - sourceSettings = mappingJob.sourceSettings, - sinkSettings = mappingJob.sinkSettings, - schedulingSettings = mappingJob.schedulingSettings.get, - terminologyServiceSettings = mappingJob.terminologyServiceSettings, - identityServiceSettings = mappingJob.getIdentityServiceSettings() - ) - // start scheduler - mappingJobScheduler.scheduler.start() - // register the job to the registry - toFhirEngine.runningJobRegistry.registerSchedulingJob(mappingJobExecution, mappingJobScheduler.scheduler) } - // run the batch job without scheduling - else { - val executionFuture: Future[Unit] = fhirMappingJobManager - .executeMappingJob( - mappingJobExecution = mappingJobExecution, - sourceSettings = mappingJob.sourceSettings, - sinkSettings = mappingJob.sinkSettings, - terminologyServiceSettings = mappingJob.terminologyServiceSettings, - identityServiceSettings = mappingJob.getIdentityServiceSettings() - ) - // Register the job to the registry - toFhirEngine.runningJobRegistry.registerBatchJob( - mappingJobExecution, - Some(executionFuture), - s"Spark job for job: ${mappingJobExecution.jobId} mappingTaskNames: ${mappingTasks.map(_.name).mkString(" ")}" - ) - } - } + // create an instance of MappingJobScheduler + val mappingJobScheduler: MappingJobScheduler = MappingJobScheduler.instance(ToFhirConfig.engineConfig.toFhirDbFolderPath) + + // create execution + val mappingJobExecution = FhirMappingJobExecution(executionId.getOrElse(UUID.randomUUID().toString), job = mappingJob, + projectId = projectId, mappingTasks = mappingTasks) + val fhirMappingJobManager = new FhirMappingJobManager( + toFhirEngine.mappingRepo, + toFhirEngine.contextLoader, + toFhirEngine.schemaLoader, + toFhirEngine.functionLibraries, + toFhirEngine.sparkSession, + Some(mappingJobScheduler) + ) + + // Streaming jobs + val submittedJob = + if (mappingJob.sourceSettings.exists(_._2.asStream)) { + // Delete checkpoint directory if set accordingly + if (executeJobTask.exists(_.clearCheckpoints)) { + mappingTasks.foreach(mappingTask => { + // Reset the archiving offset so that the archiving starts from scratch + toFhirEngine.fileStreamInputArchiver.resetOffset(mappingJobExecution, mappingTask.name) + + val checkpointDirectory: File = new File(mappingJobExecution.getCheckpointDirectory(mappingTask.name)) + io.FileUtils.deleteDirectory(checkpointDirectory) + logger.debug(s"Deleted checkpoint directory for jobId: ${mappingJobExecution.jobId}, executionId: ${mappingJobExecution.id}, mappingTaskName: ${mappingTask.name}, path: ${checkpointDirectory.getAbsolutePath}") + }) + } + + fhirMappingJobManager + .startMappingJobStream( + mappingJobExecution, + sourceSettings = mappingJob.sourceSettings, + sinkSettings = mappingJob.sinkSettings, + terminologyServiceSettings = mappingJob.terminologyServiceSettings, + identityServiceSettings = mappingJob.getIdentityServiceSettings() + ) + .foreach(sq => toFhirEngine.runningJobRegistry.registerStreamingQuery(mappingJobExecution, sq._1, sq._2)) + } + + // Batch jobs + else { + // run the mapping job with scheduler + if (mappingJob.schedulingSettings.nonEmpty) { + // check whether the job execution is already scheduled + if (executionId.nonEmpty && toFhirEngine.runningJobRegistry.isScheduled(mappingJob.id, executionId.get)) { + throw BadRequest("The mapping job execution is already scheduled!", s"The mapping job execution is already scheduled!") + } + // schedule the mapping job + fhirMappingJobManager + .scheduleMappingJob( + mappingJobExecution = mappingJobExecution, + sourceSettings = mappingJob.sourceSettings, + sinkSettings = mappingJob.sinkSettings, + schedulingSettings = mappingJob.schedulingSettings.get, + terminologyServiceSettings = mappingJob.terminologyServiceSettings, + identityServiceSettings = mappingJob.getIdentityServiceSettings() + ) + // start scheduler + mappingJobScheduler.scheduler.start() + // register the job to the registry + toFhirEngine.runningJobRegistry.registerSchedulingJob(mappingJobExecution, mappingJobScheduler.scheduler) + } + + // run the batch job without scheduling + else { + val executionFuture: Future[Unit] = fhirMappingJobManager + .executeMappingJob( + mappingJobExecution = mappingJobExecution, + sourceSettings = mappingJob.sourceSettings, + sinkSettings = mappingJob.sinkSettings, + terminologyServiceSettings = mappingJob.terminologyServiceSettings, + identityServiceSettings = mappingJob.getIdentityServiceSettings() + ) + // Register the job to the registry + toFhirEngine.runningJobRegistry.registerBatchJob( + mappingJobExecution, + Some(executionFuture), + s"Spark job for job: ${mappingJobExecution.jobId} mappingTaskNames: ${mappingTasks.map(_.name).mkString(" ")}" + ) + } + } + submittedJob } - submittedJob } /** @@ -200,54 +199,55 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin * @return */ def testMappingWithJob(projectId: String, jobId: String, testResourceCreationRequest: TestResourceCreationRequest): Future[Seq[FhirMappingResultsForInput]] = { - if (!jobRepository.getCachedMappingsJobs.contains(projectId) || !jobRepository.getCachedMappingsJobs(projectId).contains(jobId)) { - throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository.") - } - val mappingJob: FhirMappingJob = jobRepository.getCachedMappingsJobs(projectId)(jobId) - - logger.debug(s"Testing the mapping ${testResourceCreationRequest.fhirMappingTask.mappingRef} inside the job $jobId by selecting ${testResourceCreationRequest.resourceFilter.numberOfRows} ${testResourceCreationRequest.resourceFilter.order} records.") - - // If an unmanaged mapping is provided within the mapping task, normalize the context urls - val mappingTask: FhirMappingTask = - testResourceCreationRequest.fhirMappingTask.mapping match { - case None => testResourceCreationRequest.fhirMappingTask - case _ => - // get the path of mapping file which will be used to normalize mapping context urls - val pathToMappingFile: File = FileUtils.getPath(ToFhirConfig.engineConfig.mappingRepositoryFolderPath, projectId, s"${testResourceCreationRequest.fhirMappingTask.mapping.get.id}${FileExtensions.JSON}").toFile - // normalize the mapping context urls - val mappingWithNormalizedContextUrls: FhirMapping = MappingContextLoader.normalizeContextURLs(Seq((testResourceCreationRequest.fhirMappingTask.mapping.get, pathToMappingFile))).head - // Copy the mapping with the normalized urls - testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls)) - } + jobRepository.getJob(projectId, jobId) flatMap { + case None => throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists in the mapping job repository") + case Some(mj) => + val mappingJob = EnvironmentVariableResolver.resolveFhirMappingJob(mj) + + logger.debug(s"Testing the mapping ${testResourceCreationRequest.fhirMappingTask.mappingRef} inside the job $jobId by selecting ${testResourceCreationRequest.resourceFilter.numberOfRows} ${testResourceCreationRequest.resourceFilter.order} records.") + + // If an unmanaged mapping is provided within the mapping task, normalize the context urls + val mappingTask: FhirMappingTask = + testResourceCreationRequest.fhirMappingTask.mapping match { + case None => testResourceCreationRequest.fhirMappingTask + case _ => + // get the path of mapping file which will be used to normalize mapping context urls + val pathToMappingFile: File = FileUtils.getPath(ToFhirConfig.engineConfig.mappingRepositoryFolderPath, projectId, s"${testResourceCreationRequest.fhirMappingTask.mapping.get.id}${FileExtensions.JSON}").toFile + // normalize the mapping context urls + val mappingWithNormalizedContextUrls: FhirMapping = MappingContextLoader.normalizeContextURLs(Seq((testResourceCreationRequest.fhirMappingTask.mapping.get, pathToMappingFile))).head + // Copy the mapping with the normalized urls + testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls)) + } - val fhirMappingJobManager = new FhirMappingJobManager( - toFhirEngine.mappingRepo, - toFhirEngine.contextLoader, - toFhirEngine.schemaLoader, - toFhirEngine.functionLibraries, - toFhirEngine.sparkSession - ) - // Define the updated jobSourceSettings where asStream is set to false if the setting is a KafkaSourceSettings - val jobSourceSettings: Map[String, MappingJobSourceSettings] = mappingJob.sourceSettings.map { - case (key, kafkaSettings: KafkaSourceSettings) => - key -> kafkaSettings.copy(asStream = false) // Copy and update asStream to false for KafkaSourceSettings - case other => other // Keep other source settings unchanged - } + val fhirMappingJobManager = new FhirMappingJobManager( + toFhirEngine.mappingRepo, + toFhirEngine.contextLoader, + toFhirEngine.schemaLoader, + toFhirEngine.functionLibraries, + toFhirEngine.sparkSession + ) + // Define the updated jobSourceSettings where asStream is set to false if the setting is a KafkaSourceSettings + val jobSourceSettings: Map[String, MappingJobSourceSettings] = mappingJob.sourceSettings.map { + case (key, kafkaSettings: KafkaSourceSettings) => + key -> kafkaSettings.copy(asStream = false) // Copy and update asStream to false for KafkaSourceSettings + case other => other // Keep other source settings unchanged + } - val (fhirMapping, mappingJobSourceSettings, dataFrame) = fhirMappingJobManager.readJoinSourceData(mappingTask, jobSourceSettings, jobId = Some(jobId), isTestExecution = true) - val selectedDataFrame = DataFrameUtil.applyResourceFilter(dataFrame, testResourceCreationRequest.resourceFilter) - .distinct() // Remove duplicate rows to ensure each FHIR Resource is represented only once per source. - // This prevents confusion for users in the UI, as displaying the same resource multiple times could lead to misunderstandings. - fhirMappingJobManager.executeTask(mappingJob.id, mappingTask.name, fhirMapping, selectedDataFrame, mappingJobSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings(), projectId = Some(projectId)) - .map { resultingDataFrame => - // Import implicits for the Spark session - import toFhirEngine.sparkSession.implicits._ - // Group by the 'source' column - val grouped: KeyValueGroupedDataset[String, FhirMappingResult] = resultingDataFrame - .groupByKey((result: FhirMappingResult) => result.source) - // Map each group to FhirMappingResultForInput - grouped.mapGroups(FhirMappingResultConverter.convertToFhirMappingResultsForInput).collect().toSeq - } + val (fhirMapping, mappingJobSourceSettings, dataFrame) = fhirMappingJobManager.readJoinSourceData(mappingTask, jobSourceSettings, jobId = Some(jobId), isTestExecution = true) + val selectedDataFrame = DataFrameUtil.applyResourceFilter(dataFrame, testResourceCreationRequest.resourceFilter) + .distinct() // Remove duplicate rows to ensure each FHIR Resource is represented only once per source. + // This prevents confusion for users in the UI, as displaying the same resource multiple times could lead to misunderstandings. + fhirMappingJobManager.executeTask(mappingJob.id, mappingTask.name, fhirMapping, selectedDataFrame, mappingJobSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings(), projectId = Some(projectId)) + .map { resultingDataFrame => + // Import implicits for the Spark session + import toFhirEngine.sparkSession.implicits._ + // Group by the 'source' column + val grouped: KeyValueGroupedDataset[String, FhirMappingResult] = resultingDataFrame + .groupByKey((result: FhirMappingResult) => result.source) + // Map each group to FhirMappingResultForInput + grouped.mapGroups(FhirMappingResultConverter.convertToFhirMappingResultsForInput).collect().toSeq + } + } } /** diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/MappingService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/MappingService.scala index 6073c7832..c23763cc5 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/MappingService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/MappingService.scala @@ -6,7 +6,7 @@ import io.tofhir.server.common.model.{BadRequest, ResourceNotFound} import io.tofhir.server.repository.job.IJobRepository import io.tofhir.server.repository.mapping.IMappingRepository -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future class MappingService(mappingRepository: IMappingRepository, jobRepository: IJobRepository) extends LazyLogging { diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/MetadataService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/MetadataService.scala index 10f2a106d..66c4550e1 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/MetadataService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/MetadataService.scala @@ -10,6 +10,7 @@ import io.tofhir.server.config.RedCapServiceConfig import io.onfhir.definitions.resource.fhir.FhirDefinitionsConfig import io.tofhir.engine.Execution.actorSystem import io.tofhir.engine.Execution.actorSystem.dispatcher +import io.tofhir.engine.env.EnvironmentVariableResolver import io.tofhir.server.endpoint.MetadataEndpoint.SEGMENT_METADATA import io.tofhir.server.model.{Archiving, Metadata, RepositoryNames} @@ -56,7 +57,8 @@ class MetadataService(toFhirEngineConfig: ToFhirEngineConfig, erroneousRecordsFolder = toFhirEngineConfig.erroneousRecordsFolder, archiveFolder = toFhirEngineConfig.archiveFolder, streamArchivingFrequency = toFhirEngineConfig.streamArchivingFrequency - ) + ), + environmentVariables = EnvironmentVariableResolver.getEnvironmentVariables ) } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ProjectService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ProjectService.scala index ec1b7e0e3..29c52f7d8 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ProjectService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ProjectService.scala @@ -65,20 +65,21 @@ class ProjectService(projectRepository: IProjectRepository, /** * Removes a project and associated resources. * - * @param id The unique identifier of the project to be removed. + * @param projectId The unique identifier of the project to be removed. */ - def removeProject(id: String): Future[Unit] = { + def removeProject(projectId: String): Future[Unit] = { // first delete the project from repository - projectRepository.removeProject(id) + projectRepository.deleteProject(projectId) // if project deletion is failed throw the error .recover { case e: Throwable => throw e } // else delete jobs, mappings, mapping contexts and schemas as well - .map(_ => { - jobRepository.deleteProjectJobs(id) - mappingRepository.deleteProjectMappings(id) - mappingContextRepository.deleteProjectMappingContexts(id) - schemaRepository.deleteProjectSchemas(id) - }) + .flatMap { _ => + jobRepository.deleteAllJobs(projectId) map { _ => + mappingRepository.deleteAllMappings(projectId) + mappingContextRepository.deleteAllMappingContexts(projectId) + schemaRepository.deleteAllSchemas(projectId) + } + } } /** diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/RedCapService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/RedCapService.scala index 79234229f..36bd9ab6a 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/RedCapService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/RedCapService.scala @@ -11,7 +11,7 @@ import io.tofhir.server.model.redcap.RedCapProjectConfig import org.json4s.DefaultFormats import org.json4s.jackson.{JsonMethods, Serialization} -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration} diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ReloadService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ReloadService.scala index 743a0cc90..860e5ed05 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ReloadService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ReloadService.scala @@ -1,41 +1,34 @@ package io.tofhir.server.service +import io.tofhir.engine.Execution.actorSystem.dispatcher +import io.tofhir.server.repository.{FolderDBInitializer, IRepositoryManager} import io.tofhir.server.repository.job.JobFolderRepository import io.tofhir.server.repository.mapping.ProjectMappingFolderRepository import io.tofhir.server.repository.mappingContext.MappingContextFolderRepository import io.tofhir.server.repository.schema.SchemaFolderRepository import io.tofhir.server.repository.terminology.TerminologySystemFolderRepository -import io.tofhir.engine.Execution.actorSystem.dispatcher -import io.tofhir.engine.util.FileUtils -import io.tofhir.server.repository.project.ProjectFolderRepository -import io.tofhir.server.service.db.FolderDBInitializer import scala.concurrent.Future /** * Service for reloading resources from the file system. */ -class ReloadService(mappingRepository: ProjectMappingFolderRepository, - schemaRepository: SchemaFolderRepository, - mappingJobRepository: JobFolderRepository, - mappingContextRepository: MappingContextFolderRepository, - terminologySystemFolderRepository: TerminologySystemFolderRepository, - folderDBInitializer: FolderDBInitializer) { +class ReloadService(repositoryManager: IRepositoryManager) { /** * Reload all resources. + * * @return */ def reloadResources(): Future[Unit] = { - Future{ - mappingRepository.reloadMappingDefinitions() - schemaRepository.reloadSchemaDefinitions() - mappingJobRepository.reloadJobDefinitions() - mappingContextRepository.reloadMappingContextDefinitions() - terminologySystemFolderRepository.reloadTerminologySystems() - // Delete projects.json before reload projects - folderDBInitializer.removeProjectsJsonFile() - folderDBInitializer.init() + Future { + repositoryManager.mappingRepository.invalidate() + repositoryManager.schemaRepository.invalidate() + repositoryManager.mappingJobRepository.invalidate() + repositoryManager.mappingContextRepository.invalidate() + repositoryManager.terminologySystemRepository.invalidate() + repositoryManager.clear() + repositoryManager.init() } } } diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/SchemaDefinitionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/SchemaDefinitionService.scala index d1e1b4e52..0254ad283 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/SchemaDefinitionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/SchemaDefinitionService.scala @@ -7,6 +7,7 @@ import io.onfhir.api.Resource import io.onfhir.client.util.FhirClientUtil import io.onfhir.definitions.common.model.SchemaDefinition import io.tofhir.engine.Execution.actorSystem +import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.data.read.SourceHandler import io.tofhir.engine.mapping.schema.SchemaConverter @@ -19,7 +20,6 @@ import io.tofhir.server.repository.mapping.IMappingRepository import io.tofhir.server.repository.schema.ISchemaRepository import org.apache.hadoop.shaded.org.apache.http.HttpStatus -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future class SchemaDefinitionService(schemaRepository: ISchemaRepository, mappingRepository: IMappingRepository) extends LazyLogging { diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/terminology/TerminologySystemService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/terminology/TerminologySystemService.scala index 07fb16bfa..ca8249d08 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/terminology/TerminologySystemService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/terminology/TerminologySystemService.scala @@ -6,7 +6,7 @@ import io.tofhir.server.model.TerminologySystem import io.tofhir.server.repository.job.IJobRepository import io.tofhir.server.repository.terminology.ITerminologySystemRepository -import scala.concurrent.ExecutionContext.Implicits.global +import io.tofhir.engine.Execution.actorSystem.dispatcher import scala.concurrent.Future class TerminologySystemService(terminologySystemRepository: ITerminologySystemRepository, mappingJobRepository: IJobRepository) extends LazyLogging { @@ -48,10 +48,9 @@ class TerminologySystemService(terminologySystemRepository: ITerminologySystemRe * @return Updated TerminologySystem */ def updateTerminologySystem(id: String, terminologySystem: TerminologySystem): Future[TerminologySystem] = { - // Update TerminologySystem first, then update Jobs Terminology fields + // Update TerminologySystem first, then update Jobs Terminology service setting fields terminologySystemRepository.updateTerminologySystem(id, terminologySystem).flatMap { terminologySystem => - // update Jobs Terminology fields if it is successful - updateJobTerminology(id, Some(terminologySystem)).map(_ => terminologySystem) + updateJobTerminologyServiceSettings(id, Some(terminologySystem)).map(_ => terminologySystem) } } @@ -65,60 +64,43 @@ class TerminologySystemService(terminologySystemRepository: ITerminologySystemRe // Delete terminology system first, then update Jobs Terminology fields terminologySystemRepository.deleteTerminologySystem(id).flatMap { terminologySystem => // update Jobs Terminology fields if it is successful - updateJobTerminology(id).map(_ => terminologySystem) + updateJobTerminologyServiceSettings(id).map(_ => terminologySystem) } } /** - * Check whether job's id and terminologySystem id is equal - * @param job job object to be checked - * @param terminologySystemId id of terminogySystem - * @return Boolean indicating whether job's id and terminologySystem's id are equal - */ - private def checkEqualityOfIds(job: FhirMappingJob, terminologySystemId: String): Boolean = { - // Get terminology service of the job - val terminologyServiceSettings: LocalFhirTerminologyServiceSettings = - job.terminologyServiceSettings.get.asInstanceOf[LocalFhirTerminologyServiceSettings] - - terminologyServiceSettings.folderPath.split('/').lastOption.get.equals(terminologySystemId) - } - - /** - * Update Job's terminology fields with the updated terminology system. + * Find the mappings jobs which refer to the given terminologySystemId and update their terminology service settings + * with the provided terminologySystem. If the provided terminologySystem is None, remove Job's terminology service settings. * - * @param id id of terminologySystem - * @param terminologySystem updated terminology system + * @param terminologySystemId id of TerminologySystem + * @param terminologySystem TerminologySystem * @return */ - private def updateJobTerminology(id: String, terminologySystem: Option[TerminologySystem] = Option.empty): Future[Unit] = { - // Get project ids from job cache - val projectIds: Seq[String] = mappingJobRepository.getCachedMappingsJobs.keys.toSeq - - Future.sequence(projectIds.map(projectId => { - // Get jobs of given project - mappingJobRepository.getAllJobs(projectId).flatMap { jobs => - Future.sequence(jobs.filter(job => { - job.terminologyServiceSettings.isDefined && job.terminologyServiceSettings.get.isInstanceOf[LocalFhirTerminologyServiceSettings] && checkEqualityOfIds(job, id) - }).map(jobToBeUpdated => { - // Create updated job object + private def updateJobTerminologyServiceSettings(terminologySystemId: String, terminologySystem: Option[TerminologySystem] = Option.empty): Future[Unit] = { + Future.sequence(mappingJobRepository.getProjectPairs.flatMap { // Get all mapping jobs grouped under their projects. + case (projectId, mappingJobs) => + mappingJobs.filter { job => // Find the mapping jobs which refer to the given terminologySystemId + job.terminologyServiceSettings match { + case Some(settings: LocalFhirTerminologyServiceSettings) => + settings.folderPath.split('/').lastOption.contains(terminologySystemId) + case _ => false + } + } map { jobToBeUpdated => + // Create updated mapping job object val updatedJob = terminologySystem match { // Update terminology service case - case Some(ts) => { + case Some(ts) => val terminologyServiceSettings: LocalFhirTerminologyServiceSettings = jobToBeUpdated.terminologyServiceSettings.get.asInstanceOf[LocalFhirTerminologyServiceSettings] - val updatedTerminologyServiceSettings: LocalFhirTerminologyServiceSettings = terminologyServiceSettings.copy(conceptMapFiles = ts.conceptMaps, codeSystemFiles = ts.codeSystems) - jobToBeUpdated.copy(terminologyServiceSettings = Some(updatedTerminologyServiceSettings)) - } // Delete terminology service case case None => jobToBeUpdated.copy(terminologyServiceSettings = None) } // Update job in the repository mappingJobRepository.updateJob(projectId, jobToBeUpdated.id, updatedJob) - })) - } - })).map {_ => ()} + } + }.toSeq).map { _ => () } } } diff --git a/tofhir-server/src/test/scala/io/tofhir/server/service/ExecutionServiceTest.scala b/tofhir-server/src/test/scala/io/tofhir/server/service/ExecutionServiceTest.scala index 51d795065..581f5efe2 100644 --- a/tofhir-server/src/test/scala/io/tofhir/server/service/ExecutionServiceTest.scala +++ b/tofhir-server/src/test/scala/io/tofhir/server/service/ExecutionServiceTest.scala @@ -19,6 +19,7 @@ import org.scalatest.wordspec.AsyncWordSpec import java.io.{File, FileOutputStream} import java.nio.file.{Files, Paths} import scala.collection.mutable +import scala.concurrent.Future class ExecutionServiceTest extends AsyncWordSpec with Matchers with BeforeAndAfterAll { @@ -98,7 +99,7 @@ class ExecutionServiceTest extends AsyncWordSpec with Matchers with BeforeAndAft private def getMockMappingJobRepository: JobFolderRepository = { val mockMappingJobRepository: JobFolderRepository = mock[JobFolderRepository] - when(mockMappingJobRepository.getCachedMappingsJobs).thenReturn(mutable.Map("testProject" -> mutable.Map("testJob" -> testJob))) + when(mockMappingJobRepository.getJob("testProject", "testJob")).thenReturn(Future.apply(Some(testJob))) } private def getMockMappingRepository: ProjectMappingFolderRepository = {