Skip to content

Commit

Permalink
#1692 filtering runs from startDate fixed - using LocalDate/ISODate…
Browse files Browse the repository at this point in the history
… instead of lexicographical comparison as before (which does not work for DD-MM-YYYY). Input changed for controllers - now accepts YYYY-MM-DD instead of DD-MM-YYYY from the client.

Tests added + existing adjusted.
  • Loading branch information
dk1844 committed Jun 15, 2022
1 parent 4571415 commit b252ecf
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import za.co.absa.enceladus.rest_api.models.RunSummary
import za.co.absa.enceladus.rest_api.services.v3.RunServiceV3

import java.net.URI
import java.time.LocalDate
import java.util.Optional
import java.util.concurrent.CompletableFuture
import javax.servlet.http.HttpServletRequest
Expand Down Expand Up @@ -58,7 +59,7 @@ class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseControl
"You may only supply one of [startDate|sparkAppId|uniqueId].")

runService.getLatestOfEachRunSummary(
startDate = startDate.toScalaOption,
startDate = parseYmdDate(startDate),
sparkAppId = sparkAppId.toScalaOption,
uniqueId = uniqueId.toScalaOption
)
Expand All @@ -72,7 +73,7 @@ class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseControl
@RequestParam startDate: Optional[String]): CompletableFuture[Seq[RunSummary]] = {
runService.getLatestOfEachRunSummary(
datasetName = Some(datasetName),
startDate = startDate.toScalaOption
startDate = parseYmdDate(startDate)
)
}

Expand All @@ -82,7 +83,11 @@ class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseControl
def getSummariesByDatasetNameAndVersion(@PathVariable datasetName: String,
@PathVariable datasetVersion: Int,
@RequestParam startDate: Optional[String]): CompletableFuture[Seq[RunSummary]] = {
runService.getRunSummaries(Some(datasetName), Some(datasetVersion), startDate.toScalaOption)
runService.getRunSummaries(
datasetName = Some(datasetName),
datasetVersion = Some(datasetVersion),
startDate = parseYmdDate(startDate)
)
}

@PostMapping(Array("/{datasetName}/{datasetVersion}"))
Expand Down Expand Up @@ -153,11 +158,11 @@ class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseControl
@RequestBody newCheckpoint: Checkpoint,
request: HttpServletRequest): CompletableFuture[ResponseEntity[String]] = {
runService.addCheckpoint(datasetName, datasetVersion, runId, newCheckpoint).map { _ =>
val location: URI = ServletUriComponentsBuilder.fromRequest(request)
.path("/{cpName}")
.buildAndExpand(newCheckpoint.name)
.toUri
ResponseEntity.created(location).body(s"Checkpoint '${newCheckpoint.name}' added.")
val location: URI = ServletUriComponentsBuilder.fromRequest(request)
.path("/{cpName}")
.buildAndExpand(newCheckpoint.name)
.toUri
ResponseEntity.created(location).body(s"Checkpoint '${newCheckpoint.name}' added.")
}
}

Expand Down Expand Up @@ -202,4 +207,13 @@ class RunControllerV3 @Autowired()(runService: RunServiceV3) extends BaseControl
}
}

protected def parseYmdDate(dateOptStr: Optional[String]): Option[LocalDate] = {
dateOptStr.toScalaOption.map { dateStr =>
Try(LocalDate.parse(dateStr)) match {
case Success(parsed) => parsed
case Failure(e) => throw new IllegalArgumentException(s"Could not parse YYYY-MM-DD date from $dateStr: ${e.getMessage}")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package za.co.absa.enceladus.rest_api.repositories.v3

import org.mongodb.scala.MongoDatabase
import org.mongodb.scala.{Document, MongoDatabase}
import org.mongodb.scala.bson.BsonDocument
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Aggregates._
Expand All @@ -26,7 +26,9 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository
import za.co.absa.enceladus.rest_api.models.RunSummary
import za.co.absa.enceladus.rest_api.repositories.RunMongoRepository
import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3.emptyBsonFilter

import java.time.LocalDate
import scala.concurrent.Future


Expand All @@ -45,37 +47,38 @@ class RunMongoRepositoryV3 @Autowired()(mongoDb: MongoDatabase) extends RunMongo
*/
def getRunSummariesLatestOfEach(datasetName: Option[String] = None,
datasetVersion: Option[Int] = None,
startDate: Option[String] = None,
startDate: Option[LocalDate] = None,
sparkAppId: Option[String] = None,
uniqueId: Option[String] = None
): Future[Seq[RunSummary]] = {
val exclusiveOptsFilter: Option[Bson] = (startDate, sparkAppId, uniqueId) match {
case (None, None, None) => None
case (Some(startDate), None, None) => Some(startDateFromFilter(startDate))
case (None, Some(sparkAppId), None) => Some(sparkIdFilter(sparkAppId))
case (None, None, Some(uniqueId)) => Some(Filters.eq("uniqueId", uniqueId))
val exclusiveFilterStage: Seq[Bson] = (startDate, sparkAppId, uniqueId) match {
case (None, None, None) => Seq()
case (Some(startDate), None, None) => startDateFilterAggStages(startDate)
case (None, Some(sparkAppId), None) => Seq(filter(sparkIdFilter(sparkAppId)))
case (None, None, Some(uniqueId)) => Seq(filter(Filters.eq("uniqueId", uniqueId)))
case _ => throw new IllegalArgumentException("At most 1 filter of [startDate|sparkAppId|uniqueId] is allowed!")
}

val datasetFilter: Option[Bson] = datasetNameVersionOptFilter(datasetName, datasetVersion)
val combinedFilter: Bson = combineOptFilters(exclusiveOptsFilter, datasetFilter)

val pipeline = Seq(
filter(combinedFilter),
sort(descending("runId")), // this results in Accumulator.first to pickup max version
group(
// the fields are specifically selected for RunSummary usage
id = BsonDocument("""{"dataset": "$dataset", "datasetVersion": "$datasetVersion"}"""),
Accumulators.first("datasetName", "$dataset"),
Accumulators.first("datasetVersion", "$datasetVersion"),
Accumulators.first("runId", "$runId"),
Accumulators.first("status", "$runStatus.status"),
Accumulators.first("startDateTime", "$startDateTime"),
Accumulators.first("runUniqueId", "$uniqueId")
),
project(fields(excludeId())), // id composed of dsName+dsVer no longer needed
sort(ascending("datasetName", "datasetVersion"))
)
val datasetFilter: Bson = datasetNameVersionFilter(datasetName, datasetVersion)

val pipeline =
Seq(filter(datasetFilter)) ++
exclusiveFilterStage ++ // may be empty
Seq(
sort(descending("runId")), // this results in Accumulator.first to pickup max version
group(
// the fields are specifically selected for RunSummary usage
id = BsonDocument("""{"dataset": "$dataset", "datasetVersion": "$datasetVersion"}"""),
Accumulators.first("datasetName", "$dataset"),
Accumulators.first("datasetVersion", "$datasetVersion"),
Accumulators.first("runId", "$runId"),
Accumulators.first("status", "$runStatus.status"),
Accumulators.first("startDateTime", "$startDateTime"),
Accumulators.first("runUniqueId", "$uniqueId")
),
project(fields(excludeId())), // id composed of dsName+dsVer no longer needed
sort(ascending("datasetName", "datasetVersion"))
)

collection
.aggregate[RunSummary](pipeline)
Expand All @@ -84,47 +87,67 @@ class RunMongoRepositoryV3 @Autowired()(mongoDb: MongoDatabase) extends RunMongo

def getRunSummaries(datasetName: Option[String] = None,
datasetVersion: Option[Int] = None,
startDate: Option[String] = None): Future[Seq[RunSummary]] = {
startDate: Option[LocalDate] = None): Future[Seq[RunSummary]] = {

val dateFilter: Option[Bson] = startDate.map(date => startDateFromFilter(date))
val datasetFilter: Option[Bson] = datasetNameVersionOptFilter(datasetName, datasetVersion)
val combinedFilter: Bson = combineOptFilters(dateFilter, datasetFilter)
val dateFilterStages: Seq[Bson] = startDate.map(startDateFilterAggStages).getOrElse(Seq.empty)
val datasetFilter: Bson = datasetNameVersionFilter(datasetName, datasetVersion)

val pipeline = Seq(
filter(combinedFilter),
summaryProjection,
sort(ascending("datasetName", "datasetVersion", "runId"))
)
val pipeline =
Seq(filter(datasetFilter)) ++
dateFilterStages ++ // may be empty
Seq(
summaryProjection,
sort(ascending("datasetName", "datasetVersion", "runId"))
)

collection
.aggregate[RunSummary](pipeline)
.toFuture()
}

protected def startDateFromFilter(startDate: String): Bson = {
// todo use LocalDateTime?
Filters.gte("startDateTime", startDate)
/**
* Adds aggregation stages to create a typed version of `startDateTimeTyped` and filters on it to be >= `startDate`
*
* @param startDate
* @return
*/
protected def startDateFilterAggStages(startDate: LocalDate): Seq[Bson] = {
Seq(
addFields(Field("startDateTimeTyped",
Document(
"""{$dateFromString: {
| dateString: "$startDateTime",
| format: "%d-%m-%Y %H:%M:%S %z"
|}}""".stripMargin)
)),
filter(Filters.gte("startDateTimeTyped", startDate.atStartOfDay()))
)
}

protected def datasetNameVersionOptFilter(datasetName: Option[String], datasetVersion: Option[Int]): Option[Bson] = {
protected def datasetNameVersionFilter(datasetName: Option[String], datasetVersion: Option[Int]): Bson = {
(datasetName, datasetVersion) match {
case (None, None) => None // all entities
case (Some(datasetName), None) => Some(Filters.eq("dataset", datasetName))
case (Some(datasetName), Some(datasetVersion)) => Some(Filters.and(
case (None, None) => emptyBsonFilter // all entities
case (Some(datasetName), None) => Filters.eq("dataset", datasetName)
case (Some(datasetName), Some(datasetVersion)) => Filters.and(
Filters.eq("dataset", datasetName),
Filters.eq("datasetVersion", datasetVersion)
))
)
case _ => throw new IllegalArgumentException("Disallowed dataset name/version combination." +
"For dataset (name, version) filtering, the only allowed combinations are:" +
"(None, None), (Some, None) and (Some, Some)")
}
}

protected def combineOptFilters(optFilter1: Option[Bson], optFilter2: Option[Bson]): Bson = (optFilter1, optFilter2) match {
case (None, None) => BsonDocument() // empty filter
case (Some(filter1), None) => filter1
case (None, Some(filter2)) => filter2
case (Some(filter1), Some(filter2)) => Filters.and(filter1, filter2)
}
}

object RunMongoRepositoryV3 {
val emptyBsonFilter = BsonDocument()

def combineFilters(filter1: Bson, filter2: Bson): Bson = (filter1, filter2) match {
case (doc1, doc2) if doc1 == emptyBsonFilter && doc2 == emptyBsonFilter => emptyBsonFilter
case (_, doc2) if doc2 == emptyBsonFilter => filter1
case (doc1, _) if doc1 == emptyBsonFilter => filter2
case (doc1, doc2) => Filters.and(doc1, doc2)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.enceladus.rest_api.models.RunSummary
import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3
import za.co.absa.enceladus.rest_api.services.RunService

import java.time.LocalDate
import scala.concurrent.Future

@Service
Expand Down Expand Up @@ -57,7 +58,7 @@ class RunServiceV3 @Autowired()(runMongoRepository: RunMongoRepositoryV3, datase
*/
def getLatestOfEachRunSummary(datasetName: Option[String] = None,
datasetVersion: Option[Int] = None,
startDate: Option[String] = None,
startDate: Option[LocalDate] = None,
sparkAppId: Option[String] = None,
uniqueId: Option[String] = None
): Future[Seq[RunSummary]] = {
Expand All @@ -66,7 +67,7 @@ class RunServiceV3 @Autowired()(runMongoRepository: RunMongoRepositoryV3, datase

def getRunSummaries(datasetName: Option[String] = None,
datasetVersion: Option[Int] = None,
startDate: Option[String] = None): Future[Seq[RunSummary]] = {
startDate: Option[LocalDate] = None): Future[Seq[RunSummary]] = {
runMongoRepository.getRunSummaries(datasetName, datasetVersion, startDate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
}

"latest RunSummaries are queried on startDate" in {
val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "18-05-2022 13:01:12 +0200")
val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2000 13:01:12 +0200")
val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")

val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")

val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "20-05-2022 13:01:12 +0200")
val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "01-06-2022 13:01:12 +0200")
runFixture.add(
dataset1ver1run1, dataset1ver1run2,
dataset1ver2run1, dataset1ver2run2, dataset1ver2run3,
dataset2ver1run1, dataset3ver1run1
)

val response = sendGet[Array[RunSummary]](s"$apiUrl?startDate=20-05-2022")
val response = sendGet[Array[RunSummary]](s"$apiUrl?startDate=2022-05-20")
val expected = Array(dataset1ver1run2, dataset1ver2run3, dataset3ver1run1).map(_.toSummary)
response.getBody shouldBe expected
}
Expand Down Expand Up @@ -133,7 +133,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
val run2 = RunFactory.getDummyRun(dataset = "dataset3", uniqueId = None)
runFixture.add(run1, run2)

val response = sendGet[String](s"$apiUrl?startDate=24-05-2022")
val response = sendGet[String](s"$apiUrl?startDate=2022-05-24")
response.getBody shouldBe "[]" // empty array
}
}
Expand Down Expand Up @@ -171,12 +171,12 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
}

"latest RunSummaries are queried on startDate" in {
val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "18-05-2022 13:01:12 +0200")
val dataset1ver1run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 1, startDateTime = "30-01-2022 13:01:12 +0200")
val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")

val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "22-05-2022 15:01:12 +0200")
val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")
val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "01-06-2022 15:01:12 +0200")

// unrelated to dataset1:
val dataset2ver1run1 = RunFactory.getDummyRun(dataset = "dataset2", datasetVersion = 1, runId = 1, startDateTime = "17-05-2022 13:01:12 +0200")
Expand All @@ -187,7 +187,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
dataset2ver1run1, dataset3ver1run1
)

val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1?startDate=20-05-2022")
val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1?startDate=2022-05-20")
response.getStatusCode shouldBe HttpStatus.OK
val expected = Array(dataset1ver1run2, dataset1ver2run3).map(_.toSummary)
response.getBody shouldBe expected
Expand All @@ -211,7 +211,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
val run2 = RunFactory.getDummyRun(dataset = "dataset3", uniqueId = None) // unrelated to dataset1
runFixture.add(run1, run2)

val response = sendGet[String](s"$apiUrl/dataset1?startDate=24-05-2022")
val response = sendGet[String](s"$apiUrl/dataset1?startDate=2022-05-24")
response.getStatusCode shouldBe HttpStatus.OK
response.getBody shouldBe "[]" // empty array
}
Expand Down Expand Up @@ -239,9 +239,9 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
"return RunSummaries on combination of (startDate, dsName, and dsVersion)" in {
val dataset1ver1run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 1, runId = 2, startDateTime = "22-05-2022 14:01:12 +0200")

val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "19-05-2022 15:01:12 +0200")
val dataset1ver2run1 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 1, startDateTime = "30-01-2022 15:01:12 +0200")
val dataset1ver2run2 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 2, startDateTime = "20-05-2022 15:01:12 +0200")
val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "23-05-2022 15:01:12 +0200")
val dataset1ver2run3 = RunFactory.getDummyRun(dataset = "dataset1", datasetVersion = 2, runId = 3, startDateTime = "01-06-2022 15:01:12 +0200")

val dataset3ver1run1 = RunFactory.getDummyRun(dataset = "dataset3", datasetVersion = 1, runId = 1, startDateTime = "21-05-2022 13:01:12 +0200")

Expand All @@ -251,7 +251,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
dataset3ver1run1
)

val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/2?startDate=20-05-2022")
val response = sendGet[Array[RunSummary]](s"$apiUrl/dataset1/2?startDate=2022-05-20")
response.getStatusCode shouldBe HttpStatus.OK

val expected = List(dataset1ver2run2, dataset1ver2run3).map(_.toSummary)
Expand Down
Loading

0 comments on commit b252ecf

Please sign in to comment.