diff --git a/dao/src/main/scala/za/co/absa/enceladus/dao/MenasDAO.scala b/dao/src/main/scala/za/co/absa/enceladus/dao/MenasDAO.scala
index 8623dd5ef..98c81ec70 100644
--- a/dao/src/main/scala/za/co/absa/enceladus/dao/MenasDAO.scala
+++ b/dao/src/main/scala/za/co/absa/enceladus/dao/MenasDAO.scala
@@ -16,8 +16,8 @@
package za.co.absa.enceladus.dao
import org.apache.spark.sql.types.StructType
+import za.co.absa.atum.model.{Checkpoint, ControlMeasure, RunStatus}
import za.co.absa.enceladus.model._
-import za.co.absa.atum.model._
import za.co.absa.enceladus.utils.validation.ValidationLevel.Constants.DefaultValidationLevel
import za.co.absa.enceladus.utils.validation.ValidationLevel.ValidationLevel
diff --git a/dao/src/test/scala/za/co/absa/enceladus/dao/rest/JsonSerializerSuite.scala b/dao/src/test/scala/za/co/absa/enceladus/dao/rest/JsonSerializerSuite.scala
index ae6667302..4f8315893 100644
--- a/dao/src/test/scala/za/co/absa/enceladus/dao/rest/JsonSerializerSuite.scala
+++ b/dao/src/test/scala/za/co/absa/enceladus/dao/rest/JsonSerializerSuite.scala
@@ -575,10 +575,7 @@ class JsonSerializerSuite extends BaseTestSuite with VersionedModelMatchers {
| },
| "startDateTime": "04-12-2017 16:19:17 +0200",
| "runStatus": {
- | "status": {
- | "enumClass": "za.co.absa.atum.model.RunState",
- | "value": "allSucceeded"
- | },
+ | "status": "allSucceeded",
| "error": null
| },
| "controlMeasure": {
diff --git a/data-model/src/main/scala/za/co/absa/enceladus/model/Run.scala b/data-model/src/main/scala/za/co/absa/enceladus/model/Run.scala
index e31f9b3db..52f5ae394 100644
--- a/data-model/src/main/scala/za/co/absa/enceladus/model/Run.scala
+++ b/data-model/src/main/scala/za/co/absa/enceladus/model/Run.scala
@@ -16,7 +16,6 @@
package za.co.absa.enceladus.model
import za.co.absa.atum.model.{ControlMeasure, RunStatus}
-import com.typesafe.config.{Config, ConfigFactory}
case class Run
(
diff --git a/migrations/src/main/scala/za/co/absa/enceladus/migrations/migrations/MigrationToV1.scala b/migrations/src/main/scala/za/co/absa/enceladus/migrations/migrations/MigrationToV1.scala
index a205b788d..2440466ee 100644
--- a/migrations/src/main/scala/za/co/absa/enceladus/migrations/migrations/MigrationToV1.scala
+++ b/migrations/src/main/scala/za/co/absa/enceladus/migrations/migrations/MigrationToV1.scala
@@ -40,6 +40,7 @@ object MigrationToV1 extends MigrationBase with CollectionMigration with JsonMig
createIndex("schema", Seq(IndexField("name", ASC), IndexField("version", ASC)), unique = true)
createIndex("mapping_table", Seq(IndexField("name", ASC), IndexField("version", ASC)), unique = true)
createIndex("run", Seq(IndexField("dataset", ASC), IndexField("datasetVersion", ASC), IndexField("runId", ASC)), unique = true)
+ createIndex("run", Seq(IndexField("runId", ASC)))
createIndex("run", Seq(IndexField("uniqueId", ASC)), unique = true, sparse = true)
createIndex("attachment", Seq(IndexField("refName", ASC), IndexField("refVersion", ASC)))
diff --git a/pom.xml b/pom.xml
index c7e961d7e..64bcf1637 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,7 +142,7 @@
3.1.1
1.0.0
0.2.0
- 3.7.0
+ 3.8.2
2.7.3
3.5.4
2.4.2
@@ -154,7 +154,7 @@
4.4.1
2.10.4
2.10.4
- 2.9.8
+ 2.10.4
0.10.7
3.5.3
4.11
@@ -269,6 +269,15 @@
org.apache.httpcomponents
httpclient
+
+
+ jackson-databind
+ com.fasterxml.jackson.core
+
+
+ jackson-annotations
+ com.fasterxml.jackson.core
+
@@ -285,6 +294,10 @@
org.apache.httpcomponents
httpclient
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
diff --git a/rest-api/pom.xml b/rest-api/pom.xml
index 1f1a22c53..153d3a5a6 100644
--- a/rest-api/pom.xml
+++ b/rest-api/pom.xml
@@ -63,6 +63,15 @@
javax.validation
validation-api
+
+
+ jackson-databind
+ com.fasterxml.jackson.core
+
+
+ jackson-annotations
+ com.fasterxml.jackson.core
+
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/Application.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/Application.scala
index 39e9f5b97..e5d5352a4 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/Application.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/Application.scala
@@ -15,6 +15,7 @@
package za.co.absa.enceladus.rest_api
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -52,6 +53,11 @@ class Application() {
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+ // todo consider including the following to unify serialization results - #2074
+ // .setSerializationInclusion(Include.NON_ABSENT)
+ // ^ fields of `Option[T]` are not included if None (ame behavior as Atum's SerializationUtils.asJson)
+ // explanation: https://github.com/FasterXML/jackson-module-scala/issues/46#issuecomment-128770969
}
}
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/LandingPageController.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/LandingPageController.scala
index d28588974..c3d257a28 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/LandingPageController.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/LandingPageController.scala
@@ -16,7 +16,6 @@
package za.co.absa.enceladus.rest_api.controllers
import java.util.concurrent.CompletableFuture
-
import scala.concurrent.Future
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Async
@@ -36,11 +35,11 @@ import za.co.absa.enceladus.rest_api.services.RunService
@RestController
@RequestMapping(Array("/api/landing"))
class LandingPageController @Autowired() (datasetRepository: DatasetMongoRepository,
- mappingTableRepository: MappingTableMongoRepository,
- schemaRepository: SchemaMongoRepository,
- runsService: RunService,
- landingPageRepository: LandingPageStatisticsMongoRepository,
- statisticsService: StatisticsService) extends BaseController {
+ mappingTableRepository: MappingTableMongoRepository,
+ schemaRepository: SchemaMongoRepository,
+ runService: RunService,
+ landingPageRepository: LandingPageStatisticsMongoRepository,
+ statisticsService: StatisticsService) extends BaseController {
import scala.concurrent.ExecutionContext.Implicits.global
import za.co.absa.enceladus.rest_api.utils.implicits._
@@ -54,7 +53,7 @@ class LandingPageController @Autowired() (datasetRepository: DatasetMongoReposit
val dsCountFuture = datasetRepository.distinctCount()
val mappingTableFuture = mappingTableRepository.distinctCount()
val schemaFuture = schemaRepository.distinctCount()
- val runFuture = runsService.getCount()
+ val runFuture = runService.getCount()
val propertiesWithMissingCountsFuture = statisticsService.getPropertiesWithMissingCount()
val propertiesTotalsFuture: Future[(Int, Int, Int)] = propertiesWithMissingCountsFuture.map(props => {
props.foldLeft(0, 0, 0) { (acum, item) =>
@@ -66,7 +65,7 @@ class LandingPageController @Autowired() (datasetRepository: DatasetMongoReposit
}
}
})
- val todaysStatsfuture = runsService.getTodaysRunsStatistics()
+ val todaysStatsfuture = runService.getTodaysRunsStatistics()
for {
dsCount <- dsCountFuture
mtCount <- mappingTableFuture
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/RunController.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/RunController.scala
index 472e6171c..df42e41db 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/RunController.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/RunController.scala
@@ -16,7 +16,6 @@
package za.co.absa.enceladus.rest_api.controllers
import java.util.concurrent.CompletableFuture
-
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.security.core.annotation.AuthenticationPrincipal
@@ -63,13 +62,13 @@ class RunController @Autowired()(runService: RunService) extends BaseController
@GetMapping(Array("/grouped"))
@ResponseStatus(HttpStatus.OK)
def getRunSummariesPerDatasetName(): CompletableFuture[Seq[RunDatasetNameGroupedSummary]] = {
- runService.getRunSummariesPerDatasetName()
+ runService.getGroupedRunSummariesPerDatasetName()
}
@GetMapping(Array("/grouped/{datasetName}"))
@ResponseStatus(HttpStatus.OK)
def getRunSummariesPerDatasetVersion(@PathVariable datasetName: String): CompletableFuture[Seq[RunDatasetVersionGroupedSummary]] = {
- runService.getRunSummariesPerDatasetVersion(datasetName)
+ runService.getGroupedRunSummariesPerDatasetVersion(datasetName)
}
@GetMapping(Array("/bySparkAppId/{appId}"))
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/RunControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/RunControllerV3.scala
new file mode 100644
index 000000000..f95cd5d06
--- /dev/null
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/RunControllerV3.scala
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.controllers.v3
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.http.{HttpStatus, ResponseEntity}
+import org.springframework.security.core.annotation.AuthenticationPrincipal
+import org.springframework.security.core.userdetails.UserDetails
+import org.springframework.web.bind.annotation._
+import org.springframework.web.servlet.support.ServletUriComponentsBuilder
+import za.co.absa.atum.model.{Checkpoint, ControlMeasureMetadata, RunStatus}
+import za.co.absa.enceladus.model.{Run, Validation}
+import za.co.absa.enceladus.rest_api.controllers.BaseController
+import za.co.absa.enceladus.rest_api.controllers.v3.RunControllerV3.LatestKey
+import za.co.absa.enceladus.rest_api.exceptions.{NotFoundException, ValidationException}
+import za.co.absa.enceladus.rest_api.models.RunSummary
+import za.co.absa.enceladus.rest_api.models.rest.MessageWrapper
+import za.co.absa.enceladus.rest_api.services.v3.RunServiceV3
+
+import java.net.URI
+import java.time.LocalDate
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+import javax.servlet.http.HttpServletRequest
+import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
+
+object RunControllerV3 {
+ final val LatestKey = "latest"
+}
+
+@RestController
+@RequestMapping(path = Array("/api-v3/runs"), produces = Array("application/json"))
+class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseController {
+
+ import za.co.absa.enceladus.rest_api.utils.implicits._
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ @GetMapping()
+ @ResponseStatus(HttpStatus.OK)
+ def list(@RequestParam startDate: Optional[String],
+ @RequestParam sparkAppId: Optional[String],
+ @RequestParam uniqueId: Optional[String]
+ ): CompletableFuture[Seq[RunSummary]] = {
+ require(Seq(startDate, sparkAppId, uniqueId).count(_.isPresent) <= 1,
+ "You may only supply one of [startDate|sparkAppId|uniqueId].")
+
+ runService.getLatestOfEachRunSummary(
+ startDate = parseYmdDate(startDate),
+ sparkAppId = sparkAppId.toScalaOption,
+ uniqueId = uniqueId.toScalaOption
+ )
+ // todo pagination #2060
+ }
+
+ // todo pagination #2060
+ @GetMapping(Array("/{datasetName}"))
+ @ResponseStatus(HttpStatus.OK)
+ def getSummariesByDatasetName(@PathVariable datasetName: String,
+ @RequestParam startDate: Optional[String]): CompletableFuture[Seq[RunSummary]] = {
+ runService.getLatestOfEachRunSummary(
+ datasetName = Some(datasetName),
+ startDate = parseYmdDate(startDate)
+ )
+ }
+
+ // todo pagination #2060
+ @GetMapping(Array("/{datasetName}/{datasetVersion}"))
+ @ResponseStatus(HttpStatus.OK)
+ def getSummariesByDatasetNameAndVersion(@PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @RequestParam startDate: Optional[String]): CompletableFuture[Seq[RunSummary]] = {
+ runService.getRunSummaries(
+ datasetName = datasetName,
+ datasetVersion = datasetVersion,
+ startDate = parseYmdDate(startDate)
+ )
+ }
+
+ @PostMapping(Array("/{datasetName}/{datasetVersion}"))
+ @ResponseStatus(HttpStatus.CREATED)
+ def create(
+ @PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @RequestBody run: Run,
+ @AuthenticationPrincipal principal: UserDetails,
+ request: HttpServletRequest): CompletableFuture[ResponseEntity[String]] = {
+ val createdRunFuture = if (datasetName != run.dataset) {
+ Future.failed(new IllegalArgumentException(s"URL and payload entity name mismatch: '$datasetName' != '${run.dataset}'"))
+ } else if (datasetVersion != run.datasetVersion) {
+ Future.failed(new IllegalArgumentException(s"URL and payload entity version mismatch: $datasetVersion != ${run.datasetVersion}"))
+ } else {
+ runService.create(run, principal.getUsername)
+ }
+
+ createdRunFuture.map { createdRun =>
+ val location: URI = ServletUriComponentsBuilder.fromRequest(request)
+ .path("/{runId}")
+ .buildAndExpand(createdRun.runId.toString)
+ .toUri
+ ResponseEntity.created(location).body(s"Run ${createdRun.runId} with for dataset '$datasetName' v$datasetVersion created.")
+ }
+ }
+
+ // todo pagination #2060
+ @GetMapping(Array("/{datasetName}/{datasetVersion}/{runId}"))
+ @ResponseStatus(HttpStatus.OK)
+ def getRun(@PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: String): CompletableFuture[Run] = {
+ getRunForRunIdExpression(datasetName, datasetVersion, runId) // runId support latest for GET
+ }
+
+ @PutMapping(Array("/{datasetName}/{datasetVersion}/{runId}"))
+ @ResponseStatus(HttpStatus.OK)
+ def updateRunStatus(
+ @PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: Int,
+ @RequestBody newRunStatus: RunStatus): CompletableFuture[ResponseEntity[MessageWrapper]] = {
+ if (newRunStatus.status == null) {
+ Future.failed(new IllegalArgumentException("Invalid empty RunStatus submitted"))
+ } else {
+ runService.updateRunStatus(datasetName, datasetVersion, runId, newRunStatus).map(_ =>
+ ResponseEntity.ok(MessageWrapper(s"New runStatus $newRunStatus applied."))
+ )
+ }
+ }
+
+ // todo pagination #2060 ???
+ @GetMapping(Array("/{datasetName}/{datasetVersion}/{runId}/checkpoints"))
+ @ResponseStatus(HttpStatus.OK)
+ def getRunCheckpoints(@PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: String): CompletableFuture[Seq[Checkpoint]] = {
+ getRunForRunIdExpression(datasetName, datasetVersion, runId).map(_.controlMeasure.checkpoints)
+ }
+
+ @PostMapping(Array("/{datasetName}/{datasetVersion}/{runId}/checkpoints"))
+ @ResponseStatus(HttpStatus.CREATED)
+ def addCheckpoint(
+ @PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: Int,
+ @RequestBody newCheckpoint: Checkpoint,
+ request: HttpServletRequest): CompletableFuture[ResponseEntity[String]] = {
+ runService.addCheckpoint(datasetName, datasetVersion, runId, newCheckpoint).map { _ =>
+ val location: URI = ServletUriComponentsBuilder.fromRequest(request)
+ .path("/{cpName}")
+ .buildAndExpand(newCheckpoint.name)
+ .toUri
+ ResponseEntity.created(location).body(s"Checkpoint '${newCheckpoint.name}' added.")
+ }
+ }
+
+ @GetMapping(Array("/{datasetName}/{datasetVersion}/{runId}/checkpoints/{checkpointName}"))
+ @ResponseStatus(HttpStatus.OK)
+ def getRunCheckpointByName(@PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: String,
+ @PathVariable checkpointName: String): CompletableFuture[Checkpoint] = {
+ for {
+ checkpoints <- getRunForRunIdExpression(datasetName, datasetVersion, runId).map(_.controlMeasure.checkpoints)
+ cpByName = checkpoints.find(_.name == checkpointName).getOrElse(throw NotFoundException())
+ } yield cpByName
+ }
+
+ // todo pagination #2060 ???
+ @GetMapping(Array("/{datasetName}/{datasetVersion}/{runId}/metadata"))
+ @ResponseStatus(HttpStatus.OK)
+ def getRunMetadata(@PathVariable datasetName: String,
+ @PathVariable datasetVersion: Int,
+ @PathVariable runId: String): CompletableFuture[ControlMeasureMetadata] = {
+ getRunForRunIdExpression(datasetName, datasetVersion, runId).map(_.controlMeasure.metadata)
+ }
+
+ /**
+ * Retrieves a Run by dataset name, version and runId (either a number of 'latest')
+ *
+ * @param datasetName dataset name
+ * @param datasetVersion dateset version
+ * @param runIdStr runId (either a number of 'latest')
+ * @return Run object
+ */
+ protected def getRunForRunIdExpression(datasetName: String, datasetVersion: Int, runIdStr: String): Future[Run] = {
+ runIdStr match {
+ case LatestKey => runService.getLatestRun(datasetName, datasetVersion)
+ case nonLatestRunIdString => Try(nonLatestRunIdString.toInt) match {
+ case Success(actualRunId) => runService.getRun(datasetName, datasetVersion, actualRunId)
+ case Failure(exception) =>
+ Future.failed(new IllegalArgumentException(s"Cannot convert '$runIdStr' to a valid runId expression. " +
+ s"Either use 'latest' or an actual runId number. Underlying problem: ${exception.getMessage}"))
+ }
+ }
+ }
+
+ protected def parseYmdDate(dateOptStr: Optional[String]): Option[LocalDate] = {
+ dateOptStr.toScalaOption.map { dateStr =>
+ Try(LocalDate.parse(dateStr)) match {
+ case Success(parsed) => parsed
+ case Failure(e) => throw new IllegalArgumentException(s"Could not parse YYYY-MM-DD date from $dateStr: ${e.getMessage}")
+ }
+ }
+ }
+
+}
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala
index 2d1e7fcdc..0761efcfd 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala
@@ -24,7 +24,7 @@ import za.co.absa.enceladus.model.menas.audit._
import za.co.absa.enceladus.model.versionedModel._
import za.co.absa.enceladus.model.{ExportableObject, UsedIn, Validation}
import za.co.absa.enceladus.rest_api.controllers.BaseController
-import za.co.absa.enceladus.rest_api.controllers.v3.VersionedModelControllerV3.LatestVersionKey
+import za.co.absa.enceladus.rest_api.controllers.v3.VersionedModelControllerV3.LatestKey
import za.co.absa.enceladus.rest_api.exceptions.NotFoundException
import za.co.absa.enceladus.rest_api.models.rest.DisabledPayload
import za.co.absa.enceladus.rest_api.services.v3.VersionedModelServiceV3
@@ -37,7 +37,7 @@ import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object VersionedModelControllerV3 {
- final val LatestVersionKey = "latest"
+ final val LatestKey = "latest"
}
abstract class VersionedModelControllerV3[C <: VersionedModel with Product
@@ -47,7 +47,7 @@ abstract class VersionedModelControllerV3[C <: VersionedModel with Product
import scala.concurrent.ExecutionContext.Implicits.global
- // todo maybe offset/limit -> Issue #2060
+ // todo maybe offset/limit = pagination -> Issue #2060
@GetMapping(Array(""))
@ResponseStatus(HttpStatus.OK)
def getList(@RequestParam searchQuery: Optional[String]): CompletableFuture[Seq[NamedVersion]] = {
@@ -196,7 +196,7 @@ abstract class VersionedModelControllerV3[C <: VersionedModel with Product
protected def forVersionExpression[T](name: String, versionStr: String)
(forVersionFn: (String, Int) => Future[T]): Future[T] = {
versionStr.toLowerCase match {
- case LatestVersionKey =>
+ case LatestKey =>
versionedModelService.getLatestVersionValue(name).flatMap {
case None => Future.failed(notFound())
case Some(actualLatestVersion) => forVersionFn(name, actualLatestVersion)
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/models/rest/MessageWrapper.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/models/rest/MessageWrapper.scala
new file mode 100644
index 000000000..2c0fed576
--- /dev/null
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/models/rest/MessageWrapper.scala
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.models.rest
+
+case class MessageWrapper(message: String)
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/RunMongoRepository.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/RunMongoRepository.scala
index 28286d413..ee48d651b 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/RunMongoRepository.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/RunMongoRepository.scala
@@ -42,7 +42,8 @@ object RunMongoRepository {
val collectionName: String = s"$collectionBaseName${model.CollectionSuffix}"
}
-@Repository
+// scalastyle:off number.of.methods legacy code
+@Repository("runMongoRepository") // by-name qualifier - for v2 repos
class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
extends MongoRepository[Run](mongoDb) {
@@ -50,7 +51,7 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
private[rest_api] override def collectionBaseName: String = RunMongoRepository.collectionBaseName
- private val summaryProjection: Bson = project(fields(
+ protected val summaryProjection: Bson = project(fields(
computed("datasetName", "$dataset"),
computed("status", "$runStatus.status"),
computed("runUniqueId", "$uniqueId"),
@@ -193,16 +194,17 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
.aggregate[BsonDocument](pipeline)
}
- def getRunSummariesPerDatasetName(): Future[Seq[RunDatasetNameGroupedSummary]] = {
+ def getGroupedRunSummariesPerDatasetName(): Future[Seq[RunDatasetNameGroupedSummary]] = {
val pipeline = Seq(
project(fields(
include("dataset"),
- Document("""{start: {
- | $dateFromString: {
- | dateString: "$startDateTime",
- | format: "%d-%m-%Y %H:%M:%S %z"
- | }
- |}},""".stripMargin),
+ Document(
+ """{start: {
+ | $dateFromString: {
+ | dateString: "$startDateTime",
+ | format: "%d-%m-%Y %H:%M:%S %z"
+ | }
+ |}},""".stripMargin),
Document(
"""{timezone: {
| $substrBytes: [
@@ -219,13 +221,14 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
project(fields(
computed("datasetName", "$_id"),
include("numberOfRuns"),
- Document("""{latestRunStartDateTime: {
- | $dateToString: {
- | date: "$latestStart",
- | format: "%d-%m-%Y %H:%M:%S %z",
- | timezone: "$timezone"
- | }
- |}},""".stripMargin),
+ Document(
+ """{latestRunStartDateTime: {
+ | $dateToString: {
+ | date: "$latestStart",
+ | format: "%d-%m-%Y %H:%M:%S %z",
+ | timezone: "$timezone"
+ | }
+ |}},""".stripMargin),
excludeId()
)),
sort(ascending("datasetName"))
@@ -236,17 +239,18 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
.toFuture()
}
- def getRunSummariesPerDatasetVersion(datasetName: String): Future[Seq[RunDatasetVersionGroupedSummary]] = {
+ def getGroupedRunSummariesPerDatasetVersion(datasetName: String): Future[Seq[RunDatasetVersionGroupedSummary]] = {
val pipeline = Seq(
filter(equal("dataset", datasetName)),
project(fields(
include("dataset", "datasetVersion"),
- Document("""{start: {
- | $dateFromString: {
- | dateString: "$startDateTime",
- | format: "%d-%m-%Y %H:%M:%S %z"
- | }
- |}},""".stripMargin),
+ Document(
+ """{start: {
+ | $dateFromString: {
+ | dateString: "$startDateTime",
+ | format: "%d-%m-%Y %H:%M:%S %z"
+ | }
+ |}},""".stripMargin),
Document(
"""{timezone: {
| $substrBytes: [
@@ -264,13 +268,14 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
project(fields(
computed("datasetVersion", "$_id"),
include("datasetName", "numberOfRuns"),
- Document("""{latestRunStartDateTime: {
- | $dateToString: {
- | date: "$latestStart",
- | format: "%d-%m-%Y %H:%M:%S %z",
- | timezone: "$timezone"
- | }
- |}},""".stripMargin),
+ Document(
+ """{latestRunStartDateTime: {
+ | $dateToString: {
+ | date: "$latestStart",
+ | format: "%d-%m-%Y %H:%M:%S %z",
+ | timezone: "$timezone"
+ | }
+ |}},""".stripMargin),
excludeId()
)),
sort(descending("datasetVersion"))
@@ -281,12 +286,16 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
.toFuture()
}
- def getRunBySparkAppId(appId: String): Future[Seq[Run]] = {
- val stdAppIdFilter = equal("controlMeasure.metadata.additionalInfo.std_application_id", appId)
- val conformAppIdFilter = equal("controlMeasure.metadata.additionalInfo.conform_application_id", appId)
+ protected def sparkIdFilter(sparkAppId: String): Bson = {
+ val stdAppIdFilter = equal("controlMeasure.metadata.additionalInfo.std_application_id", sparkAppId)
+ val conformAppIdFilter = equal("controlMeasure.metadata.additionalInfo.conform_application_id", sparkAppId)
+
+ or(stdAppIdFilter, conformAppIdFilter)
+ }
+ def getRunBySparkAppId(appId: String): Future[Seq[Run]] = {
collection
- .find[BsonDocument](or(stdAppIdFilter, conformAppIdFilter))
+ .find[BsonDocument](sparkIdFilter(appId))
.toFuture()
.map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
}
@@ -316,7 +325,17 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
collection.withDocumentClass[BsonDocument].insertOne(bson).head()
}
- def appendCheckpoint(uniqueId: String, checkpoint: Checkpoint): Future[Option[Run]] = {
+ def getByUniqueId(uniqueId: String): Future[Option[Run]] = {
+ val filter = equal("uniqueId", uniqueId)
+
+ collection
+ .find[BsonDocument](filter)
+ .headOption()
+ .map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
+ // why not just .find[Run]? Because Run.RunStatus.RunState is a Scala enum that does not play nice with bson-serde
+ }
+
+ def appendCheckpointByUniqueId(uniqueId: String, checkpoint: Checkpoint): Future[Option[Run]] = {
val bsonCheckpoint = BsonDocument(SerializationUtils.asJson(checkpoint))
collection.withDocumentClass[BsonDocument].findOneAndUpdate(
equal("uniqueId", uniqueId),
@@ -325,6 +344,20 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
).headOption().map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
}
+ def appendCheckpoint(datasetName: String, datasetVersion: Int, runId: Int, newCheckpoint: Checkpoint): Future[Option[Run]] = {
+ val filter = and(
+ equal("dataset", datasetName),
+ equal("datasetVersion", datasetVersion),
+ equal("runId", runId)
+ )
+ val bsonCheckpoint = BsonDocument(SerializationUtils.asJson(newCheckpoint))
+ collection.withDocumentClass[BsonDocument].findOneAndUpdate(
+ filter,
+ Updates.addToSet("controlMeasure.checkpoints", bsonCheckpoint),
+ FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)
+ ).headOption().map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
+ }
+
def updateControlMeasure(uniqueId: String, controlMeasure: ControlMeasure): Future[Option[Run]] = {
val bsonControlMeasure = BsonDocument(SerializationUtils.asJson(controlMeasure))
collection.withDocumentClass[BsonDocument].findOneAndUpdate(
@@ -352,12 +385,27 @@ class RunMongoRepository @Autowired()(mongoDb: MongoDatabase)
).headOption().map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
}
+ def updateRunStatus(datasetName: String, datasetVersion: Int, runId: Int, newRunStatus: RunStatus): Future[Option[Run]] = {
+ val filter = and(
+ equal("dataset", datasetName),
+ equal("datasetVersion", datasetVersion),
+ equal("runId", runId)
+ )
+
+ val bsonRunStatus = BsonDocument(SerializationUtils.asJson(newRunStatus))
+ collection.withDocumentClass[BsonDocument].findOneAndUpdate(
+ filter,
+ Updates.set("runStatus", bsonRunStatus),
+ FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)
+ ).headOption().map(_.map(bson => SerializationUtils.fromJson[Run](bson.toJson)))
+ }
+
def existsId(uniqueId: String): Future[Boolean] = {
collection.countDocuments(equal("uniqueId", uniqueId))
.map(_ > 0).head()
}
- private def getDatasetFilter(datasetName: String, datasetVersion: Int): Bson = {
+ protected def getDatasetFilter(datasetName: String, datasetVersion: Int): Bson = {
val datasetNameEq = equal("dataset", datasetName)
val datasetVersionEq = equal("datasetVersion", datasetVersion)
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3.scala
new file mode 100644
index 000000000..25a42e699
--- /dev/null
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3.scala
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.repositories.v3
+
+import org.mongodb.scala.{Document, MongoDatabase}
+import org.mongodb.scala.bson.BsonDocument
+import org.mongodb.scala.bson.conversions.Bson
+import org.mongodb.scala.model.Aggregates._
+import org.mongodb.scala.model.Projections._
+import org.mongodb.scala.model.Sorts._
+import org.mongodb.scala.model.{Filters, _}
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Repository
+import za.co.absa.enceladus.rest_api.models.RunSummary
+import za.co.absa.enceladus.rest_api.repositories.RunMongoRepository
+import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3.emptyBsonFilter
+
+import java.time.LocalDate
+import scala.concurrent.Future
+
+
+@Repository
+class RunMongoRepositoryV3 @Autowired()(mongoDb: MongoDatabase) extends RunMongoRepository(mongoDb) {
+
+ /**
+ * Yields Latest-of-each run summaries (grouped by datasetName, datasetVersion).
+ * Optionally filtered by one of `startDate` (>=)|`sparkAppId`(==)|`uniqueId`(==)
+ * The result is ordered by datasetName, datasetVersion (both ascending)
+ *
+ * @param startDate
+ * @param sparkAppId
+ * @param uniqueId
+ * @return
+ */
+ def getRunSummariesLatestOfEach(datasetName: Option[String] = None,
+ datasetVersion: Option[Int] = None,
+ startDate: Option[LocalDate] = None,
+ sparkAppId: Option[String] = None,
+ uniqueId: Option[String] = None
+ ): Future[Seq[RunSummary]] = {
+ val exclusiveFilterStage: Seq[Bson] = (startDate, sparkAppId, uniqueId) match {
+ case (None, None, None) => Seq()
+ case (Some(startDate), None, None) => startDateFilterAggStages(startDate)
+ case (None, Some(sparkAppId), None) => Seq(filter(sparkIdFilter(sparkAppId)))
+ case (None, None, Some(uniqueId)) => Seq(filter(Filters.eq("uniqueId", uniqueId)))
+ case _ => throw new IllegalArgumentException("At most 1 filter of [startDate|sparkAppId|uniqueId] is allowed!")
+ }
+
+ val datasetFilter: Bson = datasetNameVersionFilter(datasetName, datasetVersion)
+
+ val pipeline =
+ Seq(filter(datasetFilter)) ++
+ exclusiveFilterStage ++ // may be empty
+ Seq(
+ sort(descending("runId")), // this results in Accumulator.first to pickup max version
+ group(
+ // the fields are specifically selected for RunSummary usage
+ id = BsonDocument("""{"dataset": "$dataset", "datasetVersion": "$datasetVersion"}"""),
+ Accumulators.first("datasetName", "$dataset"),
+ Accumulators.first("datasetVersion", "$datasetVersion"),
+ Accumulators.first("runId", "$runId"),
+ Accumulators.first("status", "$runStatus.status"),
+ Accumulators.first("startDateTime", "$startDateTime"),
+ Accumulators.first("runUniqueId", "$uniqueId")
+ ),
+ project(fields(excludeId())), // id composed of dsName+dsVer no longer needed
+ sort(ascending("datasetName", "datasetVersion"))
+ )
+
+ collection
+ .aggregate[RunSummary](pipeline)
+ .toFuture()
+ }
+
+ def getRunSummaries(datasetName: Option[String] = None,
+ datasetVersion: Option[Int] = None,
+ startDate: Option[LocalDate] = None): Future[Seq[RunSummary]] = {
+
+ val dateFilterStages: Seq[Bson] = startDate.map(startDateFilterAggStages).getOrElse(Seq.empty)
+ val datasetFilter: Bson = datasetNameVersionFilter(datasetName, datasetVersion)
+
+ val pipeline =
+ Seq(filter(datasetFilter)) ++
+ dateFilterStages ++ // may be empty
+ Seq(
+ summaryProjection,
+ sort(ascending("datasetName", "datasetVersion", "runId"))
+ )
+
+ collection
+ .aggregate[RunSummary](pipeline)
+ .toFuture()
+ }
+
+ /**
+ * Adds aggregation stages to create a typed version of `startDateTimeTyped` and filters on it to be >= `startDate`
+ *
+ * @param startDate
+ * @return
+ */
+ protected def startDateFilterAggStages(startDate: LocalDate): Seq[Bson] = {
+ Seq(
+ addFields(Field("startDateTimeTyped",
+ Document(
+ """{$dateFromString: {
+ | dateString: "$startDateTime",
+ | format: "%d-%m-%Y %H:%M:%S %z"
+ |}}""".stripMargin)
+ )),
+ filter(Filters.gte("startDateTimeTyped", startDate.atStartOfDay()))
+ )
+ }
+
+ protected def datasetNameVersionFilter(datasetName: Option[String], datasetVersion: Option[Int]): Bson = {
+ (datasetName, datasetVersion) match {
+ case (None, None) => emptyBsonFilter // all entities
+ case (Some(datasetName), None) => Filters.eq("dataset", datasetName)
+ case (Some(datasetName), Some(datasetVersion)) => Filters.and(
+ Filters.eq("dataset", datasetName),
+ Filters.eq("datasetVersion", datasetVersion)
+ )
+ case _ => throw new IllegalArgumentException("Disallowed dataset name/version combination." +
+ "For dataset (name, version) filtering, the only allowed combinations are:" +
+ "(None, None), (Some, None) and (Some, Some)")
+ }
+ }
+
+}
+
+object RunMongoRepositoryV3 {
+ val emptyBsonFilter = BsonDocument()
+
+ def combineFilters(filter1: Bson, filter2: Bson): Bson = (filter1, filter2) match {
+ case (doc1, doc2) if doc1 == emptyBsonFilter && doc2 == emptyBsonFilter => emptyBsonFilter
+ case (_, doc2) if doc2 == emptyBsonFilter => filter1
+ case (doc1, _) if doc1 == emptyBsonFilter => filter2
+ case (doc1, doc2) => Filters.and(doc1, doc2)
+
+ }
+}
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/RunService.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/RunService.scala
index f4ed05ff2..231eb4717 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/RunService.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/RunService.scala
@@ -16,7 +16,6 @@
package za.co.absa.enceladus.rest_api.services
import java.util.UUID
-
import com.mongodb.MongoWriteException
import org.joda.time.format.DateTimeFormat
import org.springframework.beans.factory.annotation.{Autowired, Value}
@@ -24,24 +23,24 @@ import org.springframework.stereotype.Service
import za.co.absa.atum.model.{Checkpoint, ControlMeasure, RunStatus}
import za.co.absa.enceladus.rest_api.exceptions.{NotFoundException, ValidationException}
import za.co.absa.enceladus.rest_api.models.{RunDatasetNameGroupedSummary, RunDatasetVersionGroupedSummary, RunSummary, TodaysRunsStatistics}
-import za.co.absa.enceladus.rest_api.repositories.RunMongoRepository
+import za.co.absa.enceladus.rest_api.repositories.{MongoRepository, RunMongoRepository}
import za.co.absa.enceladus.model.{Run, SplineReference, Validation}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
-@Service
-class RunService @Autowired()(val mongoRepository: RunMongoRepository)
+@Service("runService") // by-name qualifier: making V2 autowiring un-ambiguous
+class RunService @Autowired()(runMongoRepository: RunMongoRepository)
extends ModelService[Run] {
- protected val runMongoRepository: RunMongoRepository = mongoRepository // alias
+ override val mongoRepository: MongoRepository[Run] = runMongoRepository
- def getRunSummariesPerDatasetName(): Future[Seq[RunDatasetNameGroupedSummary]] = {
- runMongoRepository.getRunSummariesPerDatasetName()
+ def getGroupedRunSummariesPerDatasetName(): Future[Seq[RunDatasetNameGroupedSummary]] = {
+ runMongoRepository.getGroupedRunSummariesPerDatasetName()
}
- def getRunSummariesPerDatasetVersion(datasetName: String): Future[Seq[RunDatasetVersionGroupedSummary]] = {
- runMongoRepository.getRunSummariesPerDatasetVersion(datasetName)
+ def getGroupedRunSummariesPerDatasetVersion(datasetName: String): Future[Seq[RunDatasetVersionGroupedSummary]] = {
+ runMongoRepository.getGroupedRunSummariesPerDatasetVersion(datasetName)
}
import scala.concurrent.ExecutionContext.Implicits.global
@@ -91,6 +90,10 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
runMongoRepository.getRunBySparkAppId(appId)
}
+ def getRunByUniqueId(uniqueId: String): Future[Option[Run]] = {
+ runMongoRepository.getByUniqueId(uniqueId)
+ }
+
def getRun(datasetName: String, datasetVersion: Int, runId: Int): Future[Run] = {
runMongoRepository.getRun(datasetName, datasetVersion, runId).map {
case Some(run) => run
@@ -108,7 +111,7 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
def create(newRun: Run, username: String, retriesLeft: Int = 3): Future[Run] = {
for {
latestOpt <- runMongoRepository.getLatestRun(newRun.dataset, newRun.datasetVersion)
- run <- getRunIdentifiersIfAbsent(newRun, username, latestOpt)
+ run <- getRunIdentifiersIfAbsent(newRun, username, latestOpt) // adds uniqueId, replaces runId
validation <- validate(run)
createdRun <-
if (validation.isValid) {
@@ -132,7 +135,14 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
}
def addCheckpoint(uniqueId: String, checkpoint: Checkpoint): Future[Run] = {
- runMongoRepository.appendCheckpoint(uniqueId, checkpoint).map {
+ runMongoRepository.appendCheckpointByUniqueId(uniqueId, checkpoint).map {
+ case Some(run) => run
+ case None => throw NotFoundException()
+ }
+ }
+
+ def addCheckpoint(datasetName: String, datasetVersion: Int, runId: Int, newCheckpoint: Checkpoint): Future[Run] = {
+ runMongoRepository.appendCheckpoint(datasetName, datasetVersion, runId, newCheckpoint).map {
case Some(run) => run
case None => throw NotFoundException()
}
@@ -159,6 +169,13 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
}
}
+ def updateRunStatus(datasetName: String, datasetVersion: Int, runId: Int, newRunStatus: RunStatus): Future[Run] = {
+ runMongoRepository.updateRunStatus(datasetName, datasetVersion, runId, newRunStatus).map {
+ case Some(run) => run
+ case None => throw NotFoundException()
+ }
+ }
+
def validate(run: Run): Future[Validation] = {
validateUniqueId(run).map { validation =>
validateDatasetName(run, validation)
@@ -185,7 +202,7 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
}
}
- private def validateUniqueId(run: Run): Future[Validation] = {
+ protected def validateUniqueId(run: Run): Future[Validation] = {
val validation = Validation()
run.uniqueId match {
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/RunServiceV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/RunServiceV3.scala
new file mode 100644
index 000000000..7a2a526ac
--- /dev/null
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/RunServiceV3.scala
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.services.v3
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Service
+import za.co.absa.atum.model.Checkpoint
+import za.co.absa.enceladus.model.{Run, Validation}
+import za.co.absa.enceladus.rest_api.exceptions.{NotFoundException, ValidationException}
+import za.co.absa.enceladus.rest_api.models.RunSummary
+import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3
+import za.co.absa.enceladus.rest_api.services.RunService
+
+import java.time.LocalDate
+import scala.concurrent.Future
+
+@Service
+class RunServiceV3 @Autowired()(runMongoRepository: RunMongoRepositoryV3, datasetServiceV3: DatasetServiceV3)
+ extends RunService(runMongoRepository) {
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ override def validate(run: Run): Future[Validation] = {
+ for {
+ uniqueness <- validateUniqueId(run)
+ dsExistence <- validateDatasetExists(run.dataset, run.datasetVersion)
+ } yield uniqueness.merge(dsExistence)
+ }
+
+ protected def validateDatasetExists(datasetName: String, datasetVersion: Int): Future[Validation] = {
+ datasetServiceV3.getVersion(datasetName, datasetVersion).map {
+ case None => Validation.empty.withError("dataset", s"Dataset $datasetName v$datasetVersion not found!")
+ case Some(_) => Validation.empty
+ }
+ }
+
+ /**
+ * Yields Latest-of-each run summaries (grouped by datasetName, datasetVersion).
+ * Optionally filtered by one of `startDate` (>=)|`sparkAppId`(==)|`uniqueId`(==)
+ * The result is ordered by datasetName, datasetVersion (both ascending)
+ * @param startDate
+ * @param sparkAppId
+ * @param uniqueId
+ * @return
+ */
+ def getLatestOfEachRunSummary(datasetName: Option[String] = None,
+ startDate: Option[LocalDate] = None,
+ sparkAppId: Option[String] = None,
+ uniqueId: Option[String] = None
+ ): Future[Seq[RunSummary]] = {
+ datasetName match {
+ case None => runMongoRepository.getRunSummariesLatestOfEach(None, None, startDate, sparkAppId, uniqueId)
+ case definedDsName @ Some(dsName) => datasetServiceV3.getLatestVersion(dsName).flatMap {
+ case None => Future.failed(NotFoundException(s"Dataset $datasetName at all."))
+ case Some(_) => runMongoRepository.getRunSummariesLatestOfEach(definedDsName, None, startDate, sparkAppId, uniqueId)
+
+ }
+ }
+ }
+
+ def getRunSummaries(datasetName: String,
+ datasetVersion: Int,
+ startDate: Option[LocalDate] = None): Future[Seq[RunSummary]] = {
+ datasetServiceV3.getVersion(datasetName, datasetVersion).flatMap {
+ case Some(_) => runMongoRepository.getRunSummaries(Some(datasetName), Some(datasetVersion), startDate)
+ case _ => Future.failed(NotFoundException(s"Dataset $datasetName v$datasetVersion does not exist."))
+ }
+ }
+
+ override def addCheckpoint(datasetName: String, datasetVersion: Int, runId: Int, newCheckpoint: Checkpoint): Future[Run] = {
+ // validating for non-duplicate cp name:
+ for {
+ optRun <- runMongoRepository.getRun(datasetName, datasetVersion, runId)
+ run = optRun.getOrElse(throw NotFoundException())
+ duplicateExists = run.controlMeasure.checkpoints.find(_.name == newCheckpoint.name).nonEmpty
+ _ = if (duplicateExists) {
+ throw ValidationException(Validation.empty.withError("checkpoint.name",
+ s"Checkpoint with name ${newCheckpoint.name} already exists!"))
+ }
+ run <- super.addCheckpoint(datasetName, datasetVersion, runId, newCheckpoint)
+ } yield run
+ }
+
+}
diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/utils/implicits/package.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/utils/implicits/package.scala
index 5b0a4afd1..fa7c20558 100644
--- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/utils/implicits/package.scala
+++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/utils/implicits/package.scala
@@ -17,7 +17,6 @@ package za.co.absa.enceladus.rest_api.utils
import java.util.Optional
import java.util.concurrent.CompletableFuture
-
import io.github.cbartosiak.bson.codecs.jsr310.zoneddatetime.ZonedDateTimeAsDocumentCodec
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.bson.codecs.configuration.{CodecRegistries, CodecRegistry}
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala
index 74e58a4f8..95a012c13 100644
--- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala
@@ -73,7 +73,7 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
val response = sendPost[Dataset, Validation](apiUrl, bodyOpt = Some(dataset))
assertCreated(response)
response.getBody shouldBe Validation.empty.withWarning("keyD", "Property 'keyD' is recommended to be present, but was not found!")
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/datasets/dummyDs/1")
val relativeLocation = stripBaseUrl(locationHeader) // because locationHeader contains domain, port, etc.
@@ -294,7 +294,7 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
val response = sendPut[Dataset, Validation](s"$apiUrl/datasetA/2", bodyOpt = Some(datasetA3))
assertCreated(response)
response.getBody shouldBe Validation.empty.withWarning("keyD", "Property 'keyD' is recommended to be present, but was not found!")
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/datasets/datasetA/3")
val relativeLocation = stripBaseUrl(locationHeader) // because locationHeader contains domain, port, etc.
@@ -498,7 +498,7 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
val response = sendPost[String, Validation](s"$apiUrl/datasetXYZ/import", bodyOpt = Some(importableDs()))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/datasets/datasetXYZ/2")
response.getBody shouldBe Validation.empty.withWarning("key3", "Property 'key3' is recommended to be present, but was not found!")
@@ -528,7 +528,7 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
val response = sendPost[String, String](s"$apiUrl/datasetXYZ/import", bodyOpt = Some(importableDs()))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/datasets/datasetXYZ/1") // this is the first version
val relativeLocation = stripBaseUrl(locationHeader) // because locationHeader contains domain, port, etc.
@@ -989,7 +989,7 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA
// if, in the future, there can be a rule update resulting in a warning, let's reflect that here
response.getBody shouldBe Validation.empty
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/datasets/datasetA/2/rules/1") // increased version in the url and added rule #1
val response2 = sendGet[Dataset](s"$apiUrl/datasetA/2")
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala
index b89d599fe..b868d2be5 100644
--- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala
@@ -74,7 +74,7 @@ class MappingTableControllerV3IntegrationSuite extends BaseRestApiTestV3 with Be
val response = sendPost[MappingTable, Validation](apiUrl, bodyOpt = Some(mtA))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/mapping-tables/mtA/1")
val relativeLocation = stripBaseUrl(locationHeader) // because locationHeader contains domain, port, etc.
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala
index aa1c0a233..59880bb6d 100644
--- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala
@@ -77,7 +77,7 @@ class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 w
val response = sendPostByAdmin[PropertyDefinition, Validation](apiUrl, bodyOpt = Some(propertyDefinition))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/property-definitions/datasets/dummyName/1")
val response2 = sendGet[PropertyDefinition]("/property-definitions/datasets/dummyName/1")
@@ -93,7 +93,7 @@ class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 w
val response = sendPostByAdmin[String, Validation](apiUrl, bodyOpt = Some(propertyDefinition))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/property-definitions/datasets/smallPd/1")
val response2 = sendGet[PropertyDefinition]("/property-definitions/datasets/smallPd/1")
@@ -210,7 +210,7 @@ class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 w
val response = sendPutByAdmin[PropertyDefinition, Validation](s"$apiUrl/propertyDefinitionA/2", bodyOpt = Some(propertyDefinitionA3))
assertCreated(response)
response.getBody shouldBe Validation.empty
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionA/3")
val response2 = sendGet[PropertyDefinition](s"$apiUrl/propertyDefinitionA/3")
@@ -331,7 +331,7 @@ class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 w
val response = sendPostByAdmin[String, Validation](s"$apiUrl/propertyDefinitionXYZ/import", bodyOpt = Some(importablePd))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionXYZ/2")
response.getBody shouldBe Validation.empty
@@ -353,7 +353,7 @@ class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 w
"a the version of propertyDefinition created" in {
val response = sendPostByAdmin[String, String](s"$apiUrl/propertyDefinitionXYZ/import", bodyOpt = Some(importablePd))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionXYZ/1") // this is the first version
val response2 = sendGet[PropertyDefinition](s"$apiUrl/propertyDefinitionXYZ/1")
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/RunControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/RunControllerV3IntegrationSuite.scala
new file mode 100644
index 000000000..830ea775f
--- /dev/null
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/RunControllerV3IntegrationSuite.scala
@@ -0,0 +1,711 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.integration.controllers.v3
+
+import org.junit.runner.RunWith
+import org.scalatest.matchers.should.Matchers
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.http.HttpStatus
+import org.springframework.test.context.ActiveProfiles
+import org.springframework.test.context.junit4.SpringRunner
+import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata, RunError, RunState, RunStatus}
+import za.co.absa.atum.utils.SerializationUtils
+import za.co.absa.enceladus.model.test.factories.RunFactory.{getDummyMeasurement, getDummyRun}
+import za.co.absa.enceladus.model.test.factories.{DatasetFactory, RunFactory}
+import za.co.absa.enceladus.model.{Run, SplineReference, Validation}
+import za.co.absa.enceladus.rest_api.integration.controllers.BaseRestApiTestV3
+import za.co.absa.enceladus.rest_api.integration.fixtures.{DatasetFixtureService, FixtureService, RunFixtureService}
+import za.co.absa.enceladus.rest_api.models.rest.MessageWrapper
+import za.co.absa.enceladus.rest_api.models.{RunDatasetNameGroupedSummary, RunDatasetVersionGroupedSummary, RunSummary}
+
+import java.util.UUID
+
+@RunWith(classOf[SpringRunner])
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(Array("withEmbeddedMongo"))
+class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
+
+ import za.co.absa.enceladus.rest_api.integration.RunImplicits.RunExtensions
+
+ @Autowired
+ private val runFixture: RunFixtureService = null
+
+ @Autowired
+ private val datasetFixture: DatasetFixtureService = null
+
+ override def fixtures: List[FixtureService[_]] = List(runFixture, datasetFixture)
+
+ private val apiUrl = "/runs"
+
+ s"GET $apiUrl" can {
+ "return 200" when {
+ "latest RunSummaries are queried" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1)
+
+ val response = sendGet[String](s"$apiUrl")
+
+ assertOk(response)
+
+ val actual = response.getBody
+ val expected = SerializationUtils.asJson(Seq(dataset1ver1run2, dataset1ver2run1, dataset2ver1run1).map(_.toSummary))
+ actual shouldBe expected
+ }
+
+ "latest RunSummaries are queried on startDate" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2000 13:01:12 +0200")
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")
+
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "01-06-2022 13:01:12 +0200")
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset2ver1run1, dataset3ver1run1
+ )
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl?startDate=2022-05-20")
+ val expected = Array(dataset1ver1run2, dataset1ver2run3, dataset3ver1run1).map(_.toSummary)
+ response.getBody shouldBe expected
+ }
+
+ "latest RunSummaries are queried on uniqueId" in {
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1", runId = 1, uniqueId = Some("12345678-90ab-cdef-1234-567890abcdef"))
+ val run2 = RunFactory.getDummyRun(dataset = "dataset1", runId = 2, uniqueId = Some(UUID.randomUUID().toString)) // some other id
+ val run3 = RunFactory.getDummyRun(dataset = "datasetX", uniqueId = None)
+ runFixture.add(run1, run2, run3)
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl?uniqueId=12345678-90ab-cdef-1234-567890abcdef")
+ val expected = Array(run1).map(_.toSummary)
+ response.getBody shouldBe expected
+ }
+
+ "latest RunSummaries are queried on sparkAppId reference" in {
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1",
+ controlMeasure = RunFactory.getDummyControlMeasure(
+ metadata = RunFactory.getDummyMetadata(additionalInfo = Map(
+ "std_application_id" -> "application_1653565036000_00001"
+ ))
+ )
+ )
+
+ val run2 = RunFactory.getDummyRun(dataset = "dataset2",
+ controlMeasure = RunFactory.getDummyControlMeasure(
+ metadata = RunFactory.getDummyMetadata(additionalInfo = Map(
+ "conform_application_id" -> "application_1653565036000_00002"
+ ))
+ )
+ )
+ runFixture.add(run1, run2)
+
+ // get summary of run1 by std app_id
+ val response = sendGet[Array[RunSummary]](s"$apiUrl?sparkAppId=application_1653565036000_00001")
+ response.getBody shouldBe Seq(run1).map(_.toSummary)
+
+ // get summary of run2 by conform app_id
+ val response2 = sendGet[Array[RunSummary]](s"$apiUrl?sparkAppId=application_1653565036000_00002")
+ response2.getBody shouldBe Seq(run2).map(_.toSummary)
+ }
+ "latest RunSummaries are queried, but nothing is found" in {
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1", startDateTime = "22-05-2022 14:01:12 +0200")
+ val run2 = RunFactory.getDummyRun(dataset = "dataset3", uniqueId = None)
+ runFixture.add(run1, run2)
+
+ val response = sendGet[String](s"$apiUrl?startDate=2022-05-24")
+ response.getBody shouldBe "[]" // empty array
+ }
+ }
+
+ "return 400" when {
+ "queried on mutually exclusive options" in {
+ val response = sendGet[String](s"$apiUrl?uniqueId=12345678-90ab-cdef-1234-567890abcdef&sparkAppId=appId1")
+ response.getStatusCode shouldBe HttpStatus.BAD_REQUEST
+ response.getBody should include("You may only supply one of [startDate|sparkAppId|uniqueId]")
+
+ val response2 = sendGet[String](s"$apiUrl?startDate=20-02-2022&sparkAppId=appId1")
+ response2.getStatusCode shouldBe HttpStatus.BAD_REQUEST
+ response2.getBody should include("You may only supply one of [startDate|sparkAppId|uniqueId]")
+ }
+ }
+ }
+
+ s"GET $apiUrl/{datasetName}" can {
+ "return 200" when {
+ "latest RunSummaries are queried" in {
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1),
+ DatasetFactory.getDummyDataset("dataset1", version = 2),
+ DatasetFactory.getDummyDataset("dataset2", version = 1)
+ )
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1) // unrelated to dataset1
+
+ val response = sendGet[String](s"$apiUrl/dataset1")
+ assertOk(response)
+
+ val actual = response.getBody
+ val expected = SerializationUtils.asJson(Seq(dataset1ver1run2, dataset1ver2run1).map(_.toSummary))
+ actual shouldBe expected
+ }
+
+ "latest RunSummaries are queried on startDate" in {
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1),
+ DatasetFactory.getDummyDataset("dataset1", version = 2),
+ DatasetFactory.getDummyDataset("dataset2", version = 1),
+ DatasetFactory.getDummyDataset("dataset3", version = 1)
+ )
+
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2022 13:01:12 +0200")
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "01-06-2022 15:01:12 +0200")
+
+ // unrelated to dataset1:
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "20-05-2022 13:01:12 +0200")
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset2ver1run1, dataset3ver1run1
+ )
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1?startDate=2022-05-20")
+ response.getStatusCode shouldBe HttpStatus.OK
+ val expected = Array(dataset1ver1run2, dataset1ver2run3).map(_.toSummary)
+ response.getBody shouldBe expected
+ }
+
+ "latest RunSummaries are queried on uniqueId" in {
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1", runId = 1, uniqueId = Some("12345678-90ab-cdef-1234-567890abcdef"))
+ val run2 = RunFactory.getDummyRun(dataset = "dataset1", runId = 2, uniqueId = Some(UUID.randomUUID().toString)) // some other id
+ val run3 = RunFactory.getDummyRun(dataset = "dataset1", runId = 3, uniqueId = None)
+ val run4 = RunFactory.getDummyRun(dataset = "dataset2", uniqueId = None) // unrelated to dataset1
+ runFixture.add(run1, run2, run3, run4)
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl?uniqueId=12345678-90ab-cdef-1234-567890abcdef")
+ response.getStatusCode shouldBe HttpStatus.OK
+ val expected = Array(run1).map(_.toSummary)
+ response.getBody shouldBe expected
+ }
+
+ "latest RunSummaries are queried, but nothing is found" in {
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1),
+ DatasetFactory.getDummyDataset("dataset3", version = 1)
+ )
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1", startDateTime = "22-05-2022 14:01:12 +0200")
+ val run2 = RunFactory.getDummyRun(dataset = "dataset3", uniqueId = None) // unrelated to dataset1
+ runFixture.add(run1, run2)
+
+ val response = sendGet[String](s"$apiUrl/dataset1?startDate=2022-05-24")
+ response.getStatusCode shouldBe HttpStatus.OK
+ response.getBody shouldBe "[]" // empty array
+ }
+ "return RunSummaries by dataset name - ok even for no runs for known dataset" in {
+ // datasets referenced by runs must exist, too
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1)
+ )
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ val expected = List.empty[RunSummary]
+ response.getBody shouldBe expected
+ }
+ }
+
+ "return 404" when {
+ "RunSummaries for non-existent dataset name is queried" in {
+ // datasets referenced by runs must exist
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+
+ }
+
+ s"GET $apiUrl/{datasetName}/{datasetVersion}" can {
+ "return 200" when {
+ "return RunSummaries by dataset name and version" in {
+ // datasets referenced by runs must exist, too
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1),
+ DatasetFactory.getDummyDataset("dataset1", version = 2),
+ DatasetFactory.getDummyDataset("dataset2", version = 1)
+ )
+
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset1ver2run1, dataset2ver1run1)
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/1")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ val expected = List(dataset1ver1run1, dataset1ver1run2).map(_.toSummary)
+ response.getBody shouldBe expected
+ }
+
+ "return RunSummaries on combination of (startDate, dsName, and dsVersion)" in {
+ // datasets referenced by runs must exist, too
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1),
+ DatasetFactory.getDummyDataset("dataset1", version = 2),
+ DatasetFactory.getDummyDataset("dataset3", version = 1)
+ )
+
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "30-01-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "20-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "01-06-2022 15:01:12 +0200")
+
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "21-05-2022 13:01:12 +0200")
+
+ runFixture.add(
+ dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset3ver1run1
+ )
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/2?startDate=2022-05-20")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ val expected = List(dataset1ver2run2, dataset1ver2run3).map(_.toSummary)
+ response.getBody shouldBe expected
+
+ }
+
+ "return RunSummaries by dataset name and version - ok even for no runs for known dataset" in {
+ // datasets referenced by runs must exist, too
+ datasetFixture.add(
+ DatasetFactory.getDummyDataset("dataset1", version = 1)
+ )
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/1")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ val expected = List.empty[RunSummary]
+ response.getBody shouldBe expected
+ }
+ }
+
+ "return 404" when {
+ "RunSummaries for non-existent dataset name and version are queried" in {
+ // datasets referenced by runs must exist
+ datasetFixture.add(DatasetFactory.getDummyDataset("dataset1", version = 1)) // v1 exists
+
+ val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/2") // but v2 does not
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"POST $apiUrl/{datasetName}/{datasetVersion}" can {
+ "return 201" when {
+ "new Run is created (basic case)" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset")) // dataset ref'd by the run
+ val run = RunFactory.getDummyRun()
+
+ val response = sendPost[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+ assertCreated(response)
+ val locationHeader = response.getHeaders.getFirst("Location")
+ locationHeader should endWith("/api-v3/runs/dummyDataset/1/1")
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ assertOk(response2)
+ response2.getBody shouldBe run
+ }
+ "created run provides a uniqueId if none is specified" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
+ val run = RunFactory.getDummyRun(uniqueId = None)
+ val response = sendPost[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+
+ assertCreated(response)
+ val locationHeader = response.getHeaders.getFirst("Location")
+ locationHeader should endWith("/api-v3/runs/dummyDataset/1/1")
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ assertOk(response2)
+ val body2 = response2.getBody
+ body2.uniqueId shouldBe defined
+ body2 shouldBe run.copy(uniqueId = body2.uniqueId)
+ }
+ "created run generates a runId=1 for the first run" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
+ val run = RunFactory.getDummyRun(runId = 123) // specified runId is ignored
+
+ val response = sendPost[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+ assertCreated(response)
+ val locationHeader = response.getHeaders.getFirst("Location")
+ locationHeader should endWith("/api-v3/runs/dummyDataset/1/1")
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ assertOk(response2)
+ response2.getBody shouldBe run.copy(runId = 1) // no runs present, so runId = 1
+ }
+ "created run generates a subsequent runId" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
+ runFixture.add(
+ RunFactory.getDummyRun(runId = 1),
+ RunFactory.getDummyRun(runId = 2)
+ )
+ val run = RunFactory.getDummyRun(runId = 222) // specified runId is ignored, subsequent is used
+
+ val response = sendPost[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+ assertCreated(response)
+ val locationHeader = response.getHeaders.getFirst("Location")
+ locationHeader should endWith("/api-v3/runs/dummyDataset/1/3")
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/3")
+ assertOk(response2)
+ response2.getBody shouldBe run.copy(runId = 3) // runs 1, and 2 were already presents
+ }
+ "handles two runs being started simultaneously" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
+ val run1 = RunFactory.getDummyRun()
+ val run2 = RunFactory.getDummyRun()
+ run1.uniqueId should not be run2.uniqueId
+
+ // no guarantee which is first
+ val response1 = await(sendPostAsync[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run1)))
+ val response2 = await(sendPostAsync[Run, String](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run2)))
+
+ Set(response1, response2).foreach(_.getStatusCode shouldBe HttpStatus.CREATED)
+ Set(response1, response2).map(resp => stripBaseUrl(resp.getHeaders.getFirst("Location"))) shouldBe
+ Set("/runs/dummyDataset/1/1", "/runs/dummyDataset/1/2")
+
+ val retrieved1 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ val retrieved2 = sendGet[Run](s"$apiUrl/dummyDataset/1/2")
+ Set(retrieved1, retrieved2).foreach(_.getStatusCode shouldBe HttpStatus.OK)
+
+ retrieved1.getBody shouldBe run1.copy(runId = retrieved1.getBody.runId) // actual runId from the assigned value is used
+ retrieved2.getBody shouldBe run2.copy(runId = retrieved2.getBody.runId)
+ }
+ }
+
+ "return 400" when {
+ "sending incorrect run data" in {
+ val response = sendPost[String, String](s"$apiUrl/dummyDataset/1", bodyOpt = Some("{}"))
+
+ assertBadRequest(response)
+
+ val body = response.getBody
+ body should include("URL and payload entity name mismatch: 'dummyDataset' != 'null'")
+ }
+ "sending mismatched URL/payload data" in {
+ val response1 = sendPost[Run, String](s"$apiUrl/datasetB/2",
+ bodyOpt = Some(RunFactory.getDummyRun(dataset = "datasetA", datasetVersion = 2)))
+ assertBadRequest(response1)
+ response1.getBody should include("URL and payload entity name mismatch: 'datasetB' != 'datasetA'")
+
+ val response2 = sendPost[Run, String](s"$apiUrl/datasetC/2",
+ bodyOpt = Some(RunFactory.getDummyRun(dataset = "datasetC", datasetVersion = 3)))
+ assertBadRequest(response2)
+ response2.getBody should include("URL and payload entity version mismatch: 2 != 3")
+ }
+ "a Run with the given uniqueId already exists" in {
+ datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
+ val uniqueId = "ed9fd163-f9ac-46f8-9657-a09a4e3fb6e9"
+ val presentRun = RunFactory.getDummyRun(uniqueId = Option(uniqueId))
+ runFixture.add(presentRun)
+ val run = RunFactory.getDummyRun(uniqueId = Option(uniqueId))
+
+ val response = sendPost[Run, Validation](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+
+ assertBadRequest(response)
+
+ val body = response.getBody
+ assert(!body.isValid)
+ assert(body == Validation().withError("uniqueId", s"run with this uniqueId already exists: $uniqueId"))
+ }
+ "a Run references a non-existing dataset" in {
+ // dataset ref'd by the run does not exits
+ val run = RunFactory.getDummyRun()
+
+ val response = sendPost[Run, Validation](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
+ response.getStatusCode shouldBe HttpStatus.BAD_REQUEST
+
+ response.getBody shouldBe Validation.empty.withError("dataset", "Dataset dummyDataset v1 not found!")
+ }
+ }
+ }
+
+ s"GET $apiUrl/{datasetName}/{datasetVersion}/{runId}" can {
+ "return 200" when {
+ "return Run by dataset name, version, and runId" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+
+ val response = sendGet[String](s"$apiUrl/dataset1/1/2")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ response.getBody shouldBe
+ s"""{
+ |"uniqueId":"${dataset1ver1run2.uniqueId.get}",
+ |"runId":2,
+ |"dataset":"dataset1",
+ |"datasetVersion":1,
+ |"splineRef":{"sparkApplicationId":"dummySparkApplicationId","outputPath":"dummyOutputPath"},
+ |"startDateTime":"${dataset1ver1run2.startDateTime}",
+ |"runStatus":{"status":"allSucceeded","error":null},
+ |"controlMeasure":{
+ |"metadata":{
+ |"sourceApplication":"dummySourceApplication",
+ |"country":"dummyCountry",
+ |"historyType":"dummyHistoryType",
+ |"dataFilename":"dummyDataFilename",
+ |"sourceType":"dummySourceType",
+ |"version":1,
+ |"informationDate":"04-12-2017 16:19:17 +0200",
+ |"additionalInfo":{}
+ |},
+ |"runUniqueId":"${dataset1ver1run2.controlMeasure.runUniqueId.get}",
+ |"checkpoints":[]
+ |}
+ |}""".stripMargin.replaceAll("\n", "")
+ }
+ }
+ "return 400" when {
+ "run does not exists" in {
+ val response = sendGet[String](s"$apiUrl/dataset1/1/2")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"PUT $apiUrl/{datasetName}/{datasetVersion}/{runId}" can {
+ "return 200" when {
+ "run is updated (running -> allSucceeded)" in {
+ val run = RunFactory.getDummyRun(runStatus = RunStatus(RunState.running, None))
+ runFixture.add(run)
+
+ val response = sendPut[RunStatus, MessageWrapper](s"$apiUrl/dummyDataset/1/1",
+ bodyOpt = Option(RunStatus(RunState.allSucceeded, None)))
+ assertOk(response)
+ response.getBody shouldBe MessageWrapper("New runStatus RunStatus(allSucceeded,None) applied.")
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ assertOk(response2)
+ response2.getBody shouldBe run.copy(runStatus = RunStatus(RunState.allSucceeded, None))
+ }
+ "run is updated (running -> failed with error)" in {
+ val run = RunFactory.getDummyRun(runStatus = RunStatus(RunState.running, None))
+ runFixture.add(run)
+
+ val runError = RunError("job1", "step2", "desc3", "details4")
+ val newRunStatus = RunStatus(RunState.failed, Some(runError))
+ val response = sendPut[RunStatus, String](s"$apiUrl/dummyDataset/1/1",
+ bodyOpt = Option(newRunStatus))
+ assertOk(response)
+ response.getBody shouldBe
+ """{"message":"New runStatus RunStatus(failed,Some(RunError(job1,step2,desc3,details4))) applied."}"""
+
+ val response2 = sendGet[Run](s"$apiUrl/dummyDataset/1/1")
+ assertOk(response2)
+ response2.getBody shouldBe run.copy(runStatus = newRunStatus)
+ }
+ }
+
+ "return 400" when {
+ "sending incorrect run data" in {
+ val run = RunFactory.getDummyRun(runStatus = RunStatus(RunState.running, None))
+ runFixture.add(run)
+ val response = sendPut[String, String](s"$apiUrl/dummyDataset/1/1",
+ bodyOpt = Some("""{"badField":123}"""))
+ assertBadRequest(response)
+
+ val body = response.getBody
+ body should include("Invalid empty RunStatus submitted")
+ }
+ }
+ "return 404" when {
+ "a RunState-update references a non-existing run" in {
+ // Run ref'd by the run does not exits
+ val response = sendPut[RunStatus, Validation](s"$apiUrl/dummyDataset/1/1",
+ bodyOpt = Option(RunStatus(RunState.allSucceeded, None)))
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"GET $apiUrl/{datasetName}/{datasetVersion}/{runId}/checkpoints" can {
+ "return 200" when {
+ "return Checkpoints by dataset name, version, and runId (empty)" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset1ver1run1)
+
+ val response = sendGet[Array[Checkpoint]](s"$apiUrl/dataset1/1/1/checkpoints")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ response.getBody shouldBe Seq.empty[Checkpoint]
+ }
+ "return Checkpoints by dataset name, version, and runId (non-empty)" in {
+ import RunFactory._
+ val dataset1ver1run1 = getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1,
+ controlMeasure = RunFactory.getDummyControlMeasure(checkpoints = List(
+ RunFactory.getDummyCheckpoint(name = "cp1", order = 0, controls = List(getDummyMeasurement(controlValue = 3))),
+ RunFactory.getDummyCheckpoint(name = "cp2", order = 1, controls = List(getDummyMeasurement(controlValue = "asdf")))
+ )))
+ runFixture.add(dataset1ver1run1)
+
+ val response = sendGet[String](s"$apiUrl/dataset1/1/1/checkpoints")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ response.getBody shouldBe
+ """[{
+ |"name":"cp1","software":null,"version":null,
+ |"processStartTime":"04-12-2017 16:19:17 +0200","processEndTime":"04-12-2017 16:19:17 +0200",
+ |"workflowName":"dummyWorkFlowName","order":0,"controls":[
+ |{"controlName":"dummyControlName","controlType":"dummyControlType","controlCol":"dummyControlCol","controlValue":3}
+ |]
+ |},{
+ |"name":"cp2","software":null,"version":null,
+ |"processStartTime":"04-12-2017 16:19:17 +0200","processEndTime":"04-12-2017 16:19:17 +0200",
+ |"workflowName":"dummyWorkFlowName","order":1,"controls":[
+ |{"controlName":"dummyControlName","controlType":"dummyControlType","controlCol":"dummyControlCol","controlValue":"asdf"}
+ |]
+ |}]""".stripMargin.replaceAll("\n", "")
+ }
+ }
+ "return 404" when {
+ "run for checkpoint does not exists" in {
+ val response = sendGet[String](s"$apiUrl/dataset1/2/3/checkpoints")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"POST $apiUrl/{datasetName}/{datasetVersion}/{runId}/checkpoints" can {
+ "return 201" when {
+ "add a new checkpoint" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset1ver1run1)
+
+ val checkpoint1 = RunFactory.getDummyCheckpoint("cp1")
+ val response = sendPost[Checkpoint, String](s"$apiUrl/dataset1/1/1/checkpoints", bodyOpt = Some(checkpoint1))
+ response.getStatusCode shouldBe HttpStatus.CREATED
+ val locationHeader = response.getHeaders.getFirst("Location")
+ locationHeader should endWith("/api-v3/runs/dataset1/1/1/checkpoints/cp1")
+
+ val response2 = sendGet[Array[Checkpoint]](s"$apiUrl/dataset1/1/1/checkpoints")
+ assertOk(response2)
+ response2.getBody shouldBe Seq(checkpoint1)
+ }
+ }
+
+ "return 400" when {
+ "adding a checkpoint duplicate by name" in {
+ val checkpoint1 = RunFactory.getDummyCheckpoint("cp1")
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1,
+ controlMeasure = RunFactory.getDummyControlMeasure(checkpoints = List(checkpoint1)))
+ runFixture.add(dataset1ver1run1)
+
+ val cp1dupl = checkpoint1.copy(order = 1, workflowName = "wf2") // same name CP
+
+ val response = sendPost[Checkpoint, Validation](s"$apiUrl/dataset1/1/1/checkpoints", bodyOpt = Some(cp1dupl))
+ response.getStatusCode shouldBe HttpStatus.BAD_REQUEST
+ response.getBody shouldBe Validation.empty.withError("checkpoint.name", "Checkpoint with name cp1 already exists!")
+ }
+ }
+
+ "return 404" when {
+ "run for checkpoint does not exists" in {
+ val checkpoint1 = RunFactory.getDummyCheckpoint("cp1")
+ val response = sendPost[Checkpoint, String](s"$apiUrl/dataset1/1/1/checkpoints", bodyOpt = Some(checkpoint1))
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"GET $apiUrl/{datasetName}/{datasetVersion}/{runId}/checkpoints/{checkpointName}" can {
+ "return 200" when {
+ "return Checkpoint by dataset name, version, runId, and checkpoint name" in {
+ val cp1 = RunFactory.getDummyCheckpoint(name = "cp1", order = 0, controls = List(getDummyMeasurement(controlValue = 3)))
+ val cp2 = RunFactory.getDummyCheckpoint(name = "cp2", order = 1, controls = List(getDummyMeasurement(controlValue = "asdf")))
+ val dataset1ver1run1 = getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1,
+ controlMeasure = RunFactory.getDummyControlMeasure(checkpoints = List(cp1, cp2))
+ )
+ runFixture.add(dataset1ver1run1)
+
+ val response = sendGet[Checkpoint](s"$apiUrl/dataset1/1/1/checkpoints/cp1")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ response.getBody shouldBe cp1
+ }
+
+ }
+ "return 404" when {
+ "run does not exists" in {
+ val response = sendGet[String](s"$apiUrl/dataset1/2/3/checkpoints/namedCp")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ "cp in the run does not exists" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset1ver1run1)
+
+ val response = sendGet[String](s"$apiUrl/dataset1/1/1/checkpoints/namedCp")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+
+ s"GET $apiUrl/{datasetName}/{datasetVersion}/{runId}/metadata" can {
+ "return 200" when {
+ "return Metadata by dataset name, version, and runId" in {
+ import RunFactory._
+ val testMetadata = getDummyMetadata(additionalInfo = Map("extra_field" -> "extra_value"))
+ val dataset1ver1run1 = getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1,
+ controlMeasure = getDummyControlMeasure(metadata = testMetadata)
+ )
+ runFixture.add(dataset1ver1run1)
+
+ val response = sendGet[ControlMeasureMetadata](s"$apiUrl/dataset1/1/1/metadata")
+ response.getStatusCode shouldBe HttpStatus.OK
+
+ response.getBody shouldBe testMetadata
+ }
+ }
+ "return 404" when {
+ "run for metadata does not exists" in {
+ val response = sendGet[String](s"$apiUrl/dataset1/2/3/metadata")
+ response.getStatusCode shouldBe HttpStatus.NOT_FOUND
+ }
+ }
+ }
+}
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala
index 1b4898402..19a73cda9 100644
--- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala
@@ -88,7 +88,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val response = sendPost[Schema, Validation](apiUrl, bodyOpt = Some(schema))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/1")
response.getBody shouldBe Validation.empty
@@ -143,7 +143,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val response = sendPut[Schema, Validation](s"$apiUrl/schemaA/1", bodyOpt = Some(schema2))
assertCreated(response)
- val locationHeader = response.getHeaders.getFirst("location")
+ val locationHeader = response.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2")
response.getBody shouldBe Validation.empty
@@ -362,7 +362,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val responseUploaded = sendPostUploadFile[Validation](
s"$apiUrl/schemaA/1/from-file", TestResourcePath.Copybook.ok, schemaParams)
assertCreated(responseUploaded)
- val locationHeader = responseUploaded.getHeaders.getFirst("location")
+ val locationHeader = responseUploaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -384,7 +384,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val responseUploaded = sendPostUploadFile[Validation](
s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.ok, schemaParams)
assertCreated(responseUploaded)
- val locationHeader = responseUploaded.getHeaders.getFirst("location")
+ val locationHeader = responseUploaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -406,7 +406,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val responseUploaded = sendPostUploadFile[Schema](
s"$apiUrl/schemaA/1/from-file", TestResourcePath.Avro.ok, schemaParams)
assertCreated(responseUploaded)
- val locationHeader = responseUploaded.getHeaders.getFirst("location")
+ val locationHeader = responseUploaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -574,7 +574,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
assertCreated(responseRemoteLoaded)
responseRemoteLoaded.getBody shouldBe Validation.empty
- val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location")
+ val locationHeader = responseRemoteLoaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -599,7 +599,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
val responseRemoteLoaded = sendPostRemoteFile[Validation](s"$apiUrl/schemaA/1/from-remote-uri", params)
assertCreated(responseRemoteLoaded)
responseRemoteLoaded.getBody shouldBe Validation.empty
- val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location")
+ val locationHeader = responseRemoteLoaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -625,7 +625,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
assertCreated(responseRemoteLoaded)
responseRemoteLoaded.getBody shouldBe Validation.empty
- val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location")
+ val locationHeader = responseRemoteLoaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -730,7 +730,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
assertCreated(responseRemoteLoaded)
responseRemoteLoaded.getBody shouldBe Validation.empty
- val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location")
+ val locationHeader = responseRemoteLoaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
@@ -757,7 +757,7 @@ class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAn
assertCreated(responseRemoteLoaded)
responseRemoteLoaded.getBody shouldBe Validation.empty
- val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location")
+ val locationHeader = responseRemoteLoaded.getHeaders.getFirst("Location")
locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version
val response2 = sendGet[Schema]("/schemas/schemaA/2")
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/RunRepositoryIntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/RunRepositoryIntegrationSuite.scala
index 443b57bbc..7ab64b2b6 100644
--- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/RunRepositoryIntegrationSuite.scala
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/RunRepositoryIntegrationSuite.scala
@@ -19,7 +19,7 @@ import java.time.{LocalDate, ZoneId}
import java.time.format.DateTimeFormatter
import com.mongodb.MongoWriteException
import org.junit.runner.RunWith
-import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.{Autowired, Qualifier}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.junit4.SpringRunner
@@ -31,6 +31,8 @@ import za.co.absa.enceladus.model.Run
import za.co.absa.enceladus.model.test.factories.RunFactory
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer
+import java.util.UUID
+
@RunWith(classOf[SpringRunner])
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(Array("withEmbeddedMongo"))
@@ -42,6 +44,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
private val runFixture: RunFixtureService = null
@Autowired
+ @Qualifier("runMongoRepository") // to correctly wire V2 runMongoRepository
private val runMongoRepository: RunMongoRepository = null
override def fixtures: List[FixtureService[_]] = List(runFixture)
@@ -90,6 +93,31 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
}
}
+ val uuid1 = UUID.randomUUID().toString
+ "RunMongoRepository::getByUniqueId" should {
+ "return a defined Option of the Run when exists" when {
+ "there is a Run of the specified Dataset" in {
+ val dataset1run1 = RunFactory.getDummyRun(dataset = "dataset", datasetVersion = 1, runId = 1)
+ val dataset1run2 = RunFactory.getDummyRun(dataset = "dataset", datasetVersion = 1, runId = 2, uniqueId = Some(uuid1))
+ val dataset2run2 = RunFactory.getDummyRun(dataset = "dataset", datasetVersion = 2, runId = 2)
+ runFixture.add(dataset1run1, dataset1run2, dataset2run2)
+
+ val actual = await(runMongoRepository.getByUniqueId(uuid1))
+ assert(actual == Some(dataset1run2))
+ }
+ }
+
+ "return None when not found" when {
+ "there is no Run with the specified datasetName" in {
+ setUpSimpleRun()
+
+ val otherId = UUID.randomUUID().toString
+ val actual = await(runMongoRepository.getByUniqueId(otherId))
+ assert(actual == None)
+ }
+ }
+ }
+
"RunMongoRepository::getByStartDate" should {
val startDate = "28-01-2019"
@@ -213,7 +241,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset2v2run2 = RunFactory.getDummyRun(dataset = dataset2Name, datasetVersion = 2, runId = 2, startDateTime = "05-12-2018 06:00:00 +0200")
runFixture.add(dataset1v1run1, dataset1v1run2, dataset1v2run1, dataset2v1run1, dataset2v2run1, dataset2v2run2)
- val actual = await(runMongoRepository.getRunSummariesPerDatasetName())
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetName())
val dataset1Summary = RunDatasetNameGroupedSummary("dataset1", 3, "04-12-2018 16:19:17 +0200")
val dataset2Summary = RunDatasetNameGroupedSummary("dataset2", 3, "05-12-2018 06:00:00 +0200")
@@ -235,7 +263,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset1v1run2 = RunFactory.getDummyRun(dataset = dataset1Name, datasetVersion = 1, runId = 2, startDateTime = "04-12-2018 13:00:00 +0200")
runFixture.add(dataset1v1run2)
- val actual = await(runMongoRepository.getRunSummariesPerDatasetName())
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetName())
val dataset1Summary = RunDatasetNameGroupedSummary("dataset1", 3, "04-12-2018 16:19:17 +0200")
val dataset2Summary = RunDatasetNameGroupedSummary("dataset2", 3, "05-12-2018 06:00:00 +0200")
@@ -246,7 +274,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
"there are no Run entities stored in the database" should {
"return an empty collection" in {
- val actual = await(runMongoRepository.getRunSummariesPerDatasetName())
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetName())
assert(actual.isEmpty)
}
@@ -267,7 +295,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset2v2run2 = RunFactory.getDummyRun(dataset = wrongDatasetName, datasetVersion = 2, runId = 2, startDateTime = "05-12-2018 06:00:00 +0200")
runFixture.add(dataset1v1run1, dataset1v1run2, dataset1v2run1, dataset2v1run1, dataset2v2run1, dataset2v2run2)
- val actual = await(runMongoRepository.getRunSummariesPerDatasetVersion(queriedDatasetName))
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetVersion(queriedDatasetName))
val dataset1v1Summary = RunDatasetVersionGroupedSummary(queriedDatasetName, 1, 2, "04-12-2018 13:00:00 +0200")
val dataset1v2Summary = RunDatasetVersionGroupedSummary(queriedDatasetName, 2, 1, "04-12-2018 16:19:17 +0200")
@@ -281,7 +309,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val dataset1v1run1 = RunFactory.getDummyRun(dataset = queriedDatasetName, datasetVersion = 1, runId = 1, startDateTime = "03-12-2018 12:00:00 +0200")
runFixture.add(dataset1v1run1)
- val actual = await(runMongoRepository.getRunSummariesPerDatasetVersion(queriedDatasetName))
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetVersion(queriedDatasetName))
val dataset1v1Summary = RunDatasetVersionGroupedSummary(queriedDatasetName, 1, 1, "03-12-2018 12:00:00 +0200")
val dataset1v2Summary = RunDatasetVersionGroupedSummary(queriedDatasetName, 2, 1, "04-12-2018 16:19:17 +0200")
@@ -295,7 +323,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val run = RunFactory.getDummyRun(dataset = wrongDatasetName, datasetVersion = 1, runId = 1)
runFixture.add(run)
- val actual = await(runMongoRepository.getRunSummariesPerDatasetVersion(queriedDatasetName))
+ val actual = await(runMongoRepository.getGroupedRunSummariesPerDatasetVersion(queriedDatasetName))
assert(actual.isEmpty)
}
@@ -526,7 +554,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
val checkpoint1 = RunFactory.getDummyCheckpoint(name = "checkpoint1")
- val actual = await(runMongoRepository.appendCheckpoint(uniqueId, checkpoint1))
+ val actual = await(runMongoRepository.appendCheckpointByUniqueId(uniqueId, checkpoint1))
val expectedControlMeasure = run.controlMeasure.copy(checkpoints = List(checkpoint0, checkpoint1))
val expected = run.copy(controlMeasure = expectedControlMeasure)
@@ -538,7 +566,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
"there is no Run with the specified uniqueId" in {
val checkpoint = RunFactory.getDummyCheckpoint()
- val actual = await(runMongoRepository.appendCheckpoint(uniqueId, checkpoint))
+ val actual = await(runMongoRepository.appendCheckpointByUniqueId(uniqueId, checkpoint))
assert(actual.isEmpty)
}
@@ -603,7 +631,7 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
}
}
- "RunMongoRepository::updateRunStatus" should {
+ "RunMongoRepository::updateRunStatus(uniqueId, newRunStatus)" should {
val uniqueId = "ed9fd163-f9ac-46f8-9657-a09a4e3fb6e9"
"update the Run's RunStatus and return the updated Run" when {
@@ -632,6 +660,35 @@ class RunRepositoryIntegrationSuite extends BaseRepositoryTest {
}
}
+ "RunMongoRepository::updateRunStatus(datasetName, datasetVersion, runId, newRunStatus)" should {
+ "update the Run's RunStatus and return the updated Run" when {
+ "there is a Run with the specified dsName, dsVersion, and runId" in {
+ val originalStatus = RunFactory.getDummyRunStatus(runState = RunState.running)
+ val run = RunFactory.getDummyRun(dataset = "dsA", datasetVersion = 3, runId = 2, runStatus = originalStatus)
+
+ val unrelatedRun1 = RunFactory.getDummyRun(dataset = "dsA", datasetVersion = 3, runId = 1, runStatus = originalStatus)
+ val unrelatedRun2 = RunFactory.getDummyRun(dataset = "dsA", datasetVersion = 1, runId = 2, runStatus = originalStatus)
+ runFixture.add(run, unrelatedRun1, unrelatedRun2)
+
+ val newlySetStatus = RunFactory.getDummyRunStatus(runState = RunState.allSucceeded)
+
+ val actual = await(runMongoRepository.updateRunStatus("dsA", 3, 2, newlySetStatus))
+ val expected = run.copy(runStatus = newlySetStatus)
+ assert(actual.contains(expected))
+
+ // check via query - run state changed, unrelated untouched:
+ // todo use runMongoRepository.getRunsForDatasetname or similar when available
+ }
+ }
+
+ "return None" when {
+ "there is no Run with the specified uniqueId" in {
+ val actual = await(runMongoRepository.updateRunStatus("dsA", 3, 2, RunFactory.getDummyRunStatus()))
+ assert(actual.isEmpty)
+ }
+ }
+ }
+
"RunMongoRepository::getTodaysRuns" should {
"return 0" when {
"there are no runs" in {
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/v3/RunRepositoryV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/v3/RunRepositoryV3IntegrationSuite.scala
new file mode 100644
index 000000000..48c2f6ad6
--- /dev/null
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/repositories/v3/RunRepositoryV3IntegrationSuite.scala
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.integration.repositories.v3
+
+import org.junit.runner.RunWith
+import org.scalatest.matchers.should.Matchers
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ActiveProfiles
+import org.springframework.test.context.junit4.SpringRunner
+import za.co.absa.enceladus.model.Run
+import za.co.absa.enceladus.model.test.factories.RunFactory
+import za.co.absa.enceladus.rest_api.integration.fixtures.{FixtureService, RunFixtureService}
+import za.co.absa.enceladus.rest_api.integration.repositories.BaseRepositoryTest
+import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3
+import za.co.absa.enceladus.utils.time.TimeZoneNormalizer
+
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneId}
+import java.util.UUID
+
+@RunWith(classOf[SpringRunner])
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(Array("withEmbeddedMongo"))
+class RunRepositoryV3IntegrationSuite extends BaseRepositoryTest with Matchers {
+
+ import za.co.absa.enceladus.rest_api.integration.RunImplicits.RunExtensions
+
+ @Autowired
+ private val runFixture: RunFixtureService = null
+
+ @Autowired
+ private val runMongoRepository: RunMongoRepositoryV3 = null
+
+ override def fixtures: List[FixtureService[_]] = List(runFixture)
+
+ private val today = LocalDate.now(ZoneId.of(TimeZoneNormalizer.timeZone)).format(DateTimeFormatter.ofPattern("dd-MM-yyyy"))
+
+ "RunMongoRepository::getRunSummariesLatestOfEach" should {
+ "return only the latest RunSummaries" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1)
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach())
+
+ val expected = List(dataset1ver1run2, dataset1ver2run1, dataset2ver1run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return only latest RunSummaries on startDate or later" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2022 13:01:12 +0200")
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")
+
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "01-06-2022 13:01:12 +0200")
+
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset2ver1run1, dataset3ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach(startDate = Some(LocalDate.parse("2022-05-20"))))
+ val expected = List(dataset1ver1run2, dataset1ver2run3, dataset3ver1run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return only the RunSummaries with the correct uniqueId" in {
+ val r1id = UUID.randomUUID().toString
+ val run1 = RunFactory.getDummyRun(dataset = "dataset1", runId = 1, uniqueId = Some(r1id))
+ val run2 = RunFactory.getDummyRun(dataset = "dataset1", runId = 2, uniqueId = Some(UUID.randomUUID().toString)) // some other id
+ val run3 = RunFactory.getDummyRun(dataset = "datasetX", uniqueId = None)
+
+ runFixture.add(run1, run2, run3)
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach(uniqueId = Some(r1id)))
+ val expected = List(run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return only RunSummaries sparkAppId reference" in {
+ val sampleAppId1 = "application_1578585424019_0008" // YARN
+ val sampleAppId2 = "local-1433865536131" // local
+ val sampleAppId3 = "driver-20170926223339-0001" // MESOS
+
+ val run1 = prepareRunWithAppIds(Some(sampleAppId1), None, runId = 1) // std app_id only
+ val run2 = prepareRunWithAppIds(Some(sampleAppId2), Some(sampleAppId3), runId = 2) // both std and cnfrm app_ids
+ runFixture.add(run1, run2)
+
+ // get summary of run1 by std app_id
+ await(runMongoRepository.getRunSummariesLatestOfEach(sparkAppId = Some(sampleAppId1))) shouldBe Seq(run1.toSummary)
+
+ // get summary of run2 by std app_id
+ await(runMongoRepository.getRunSummariesLatestOfEach(sparkAppId = Some(sampleAppId2))) shouldBe Seq(run2.toSummary)
+
+ // get summary of run2 by conform app_id
+ await(runMongoRepository.getRunSummariesLatestOfEach(sparkAppId = Some(sampleAppId3))) shouldBe Seq(run2.toSummary)
+
+ // get nothing by a different sparkAppId
+ await(runMongoRepository.getRunSummariesLatestOfEach(sparkAppId = Some("application_1653565036000_12345"))) shouldBe Seq.empty
+ }
+
+ "return only latest RunSummaries with specific dataset name" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1) // not the latest run
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1) // not dataset1
+
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset2ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach(datasetName = Some("dataset1")))
+ val expected = List(dataset1ver1run2, dataset1ver2run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return only latest RunSummaries with specific dataset name and version" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1) // not the latest run
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1) // not dataset1 and v1
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1) // not dataset1
+
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset2ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach(datasetName = Some("dataset1"), datasetVersion = Some(1)))
+ val expected = List(dataset1ver1run2).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return only latest RunSummaries with combination of specific dataset and startDate" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2000 13:01:12 +0200")
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "12-12-2022 15:01:12 +0200")
+
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "20-05-2022 13:01:12 +0200")
+
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset2ver1run1, dataset3ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummariesLatestOfEach(startDate = Some(LocalDate.parse("2022-05-20")),
+ datasetName = Some("dataset1")))
+ val expected = List(dataset1ver1run2, dataset1ver2run3).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "fail" when {
+ "multiple options of startDate|sparkAppId|uniqueId are given" in {
+ val TheExpectedErrorMessage = "At most 1 filter of [startDate|sparkAppId|uniqueId] is allowed!"
+
+ (the[IllegalArgumentException] thrownBy {
+ await(runMongoRepository.getRunSummariesLatestOfEach(sparkAppId = Some("sampleAppId1"), uniqueId = Some("adf")))
+ }).getMessage shouldBe TheExpectedErrorMessage
+
+ (the[IllegalArgumentException] thrownBy {
+ await(runMongoRepository.getRunSummariesLatestOfEach(startDate = Some(LocalDate.parse("2020-05-05")), uniqueId = Some("adf")))
+ }).getMessage shouldBe TheExpectedErrorMessage
+ }
+
+ "incorrect combination of dataset, datasetVersion is given (None, Some)" in {
+ val caught = the[IllegalArgumentException] thrownBy {
+ await(runMongoRepository.getRunSummariesLatestOfEach(datasetName = None, datasetVersion = Some(2)))
+ }
+ caught.getMessage should include("Disallowed dataset name/version combination.")
+ }
+ }
+ }
+
+ "RunMongoRepository::getRunSummaries" should {
+ "return all RunSummaries" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1)
+
+ val actual = await(runMongoRepository.getRunSummaries())
+
+ val expected = List(dataset1ver1run1, dataset1ver1run2, dataset1ver2run1, dataset2ver1run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return RunSummaries by dataset name" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1)
+
+ val actual = await(runMongoRepository.getRunSummaries(datasetName = Some("dataset1")))
+
+ val expected = List(dataset1ver1run1, dataset1ver1run2, dataset1ver2run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return RunSummaries by dataset name and version" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1)
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2)
+ runFixture.add(dataset1ver1run1, dataset1ver1run2)
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1)
+ runFixture.add(dataset1ver2run1)
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1)
+ runFixture.add(dataset2ver1run1)
+
+ val actual = await(runMongoRepository.getRunSummaries(datasetName = Some("dataset1"), datasetVersion = Some(1)))
+
+ val expected = List(dataset1ver1run1, dataset1ver1run2).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return RunSummaries on startDate or later" in {
+ val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2000 13:01:12 +0200")
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "10-10-2022 15:01:12 +0200")
+
+ val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "20-05-2022 13:01:12 +0200")
+
+ runFixture.add(
+ dataset1ver1run1, dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset2ver1run1, dataset3ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummaries(startDate = Some(LocalDate.parse("2022-05-20"))))
+ val expected = List(dataset1ver1run2, dataset1ver2run2, dataset1ver2run3, dataset3ver1run1).map(_.toSummary)
+ assert(actual == expected)
+ }
+
+ "return RunSummaries on combination of (startDate, dsName, and dsVersion)" in {
+ val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")
+
+ val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
+ val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "20-05-2022 15:01:12 +0200")
+ val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")
+
+ val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "21-05-2022 13:01:12 +0200")
+
+ runFixture.add(
+ dataset1ver1run2,
+ dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
+ dataset3ver1run1
+ )
+
+ val actual = await(runMongoRepository.getRunSummaries(datasetName = Some("dataset1"),
+ datasetVersion = Some(2), startDate = Some(LocalDate.parse("2022-05-20")))
+ )
+ val expected = List(dataset1ver2run2, dataset1ver2run3).map(_.toSummary)
+ assert(actual == expected)
+ }
+ }
+
+ private def prepareRunWithAppIds(stdAppId: Option[String], confAppId: Option[String], runId: Int = 1): Run = {
+
+ val additionalInfo: Map[String, String] =
+ stdAppId.map(id => Map("std_application_id" -> id)).getOrElse(Map.empty) ++
+ confAppId.map(id => Map("conform_application_id" -> id)).getOrElse(Map.empty)
+
+ val metadata = RunFactory.getDummyMetadata(additionalInfo = additionalInfo)
+ val controlMeasure = RunFactory.getDummyControlMeasure(metadata = metadata)
+ RunFactory.getDummyRun(runId = runId, controlMeasure = controlMeasure)
+ }
+}
diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3Test.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3Test.scala
new file mode 100644
index 000000000..a25731029
--- /dev/null
+++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/repositories/v3/RunMongoRepositoryV3Test.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.enceladus.rest_api.repositories.v3
+
+import org.mongodb.scala.bson.BsonDocument
+import org.mongodb.scala.model.Filters
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class RunMongoRepositoryV3Test extends AnyFlatSpec with Matchers {
+ import RunMongoRepositoryV3._
+
+ behavior of "RunMongoRepositoryV3"
+
+ it should "combineFilters" in {
+ RunMongoRepositoryV3.combineFilters(BsonDocument(), BsonDocument()) shouldBe emptyBsonFilter
+ RunMongoRepositoryV3.combineFilters(Filters.eq("a", 1), BsonDocument()) shouldBe Filters.eq("a", 1)
+ RunMongoRepositoryV3.combineFilters(BsonDocument(), Filters.eq("b", 0)) shouldBe Filters.eq("b", 0)
+ RunMongoRepositoryV3.combineFilters(Filters.eq("a", 3), Filters.gte("c", -1)) shouldBe
+ Filters.and(Filters.eq("a", 3), Filters.gte("c", -1))
+ }
+
+}