Skip to content

Commit

Permalink
Merge branch 'main' into update-grackle
Browse files Browse the repository at this point in the history
  • Loading branch information
tpolecat committed Oct 16, 2023
2 parents d21d94e + a9e48ae commit 0b3efc2
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait DatasetTable[F[_]] extends BaseMapping[F] {

object Time {
val Start: ColumnRef = col("c_start_time", core_timestamp.opt)
val End: ColumnRef = col("c_start_time", core_timestamp.opt)
val End: ColumnRef = col("c_end_time", core_timestamp.opt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import lucuma.core.enums.DatasetQaState
import lucuma.core.model.User
import lucuma.core.model.sequence.Dataset
import lucuma.core.model.sequence.Step
import lucuma.core.util.Timestamp
import lucuma.odb.data.Nullable
import lucuma.odb.graphql.input.DatasetPropertiesInput
import lucuma.odb.util.Codecs.*
Expand All @@ -35,6 +36,16 @@ sealed trait DatasetService[F[_]] {
SET: DatasetPropertiesInput,
which: AppliedFragment
)(using Transaction[F]): F[List[Dataset.Id]]

def setStartTime(
datasetId: Dataset.Id,
time: Timestamp
)(using Transaction[F]): F[Unit]

def setEndTime(
datasetId: Dataset.Id,
time: Timestamp
)(using Transaction[F]): F[Unit]
}

object DatasetService {
Expand Down Expand Up @@ -103,7 +114,19 @@ object DatasetService {
}
}

}
def setStartTime(
datasetId: Dataset.Id,
time: Timestamp
)(using Transaction[F]): F[Unit] =
session.execute(Statements.SetStartTime)(time, datasetId).void

def setEndTime(
datasetId: Dataset.Id,
time: Timestamp
)(using Transaction[F]): F[Unit] =
session.execute(Statements.SetEndTime)(time, datasetId).void

}

object Statements {

Expand Down Expand Up @@ -144,5 +167,20 @@ object DatasetService {
void" RETURNING c_dataset_id"
}
}

val SetStartTime: Command[(Timestamp, Dataset.Id)] =
sql"""
UPDATE t_dataset
SET c_start_time = $core_timestamp,
c_end_time = NULL
WHERE c_dataset_id = $dataset_id
""".command

val SetEndTime: Command[(Timestamp, Dataset.Id)] =
sql"""
UPDATE t_dataset
SET c_end_time = $core_timestamp
WHERE c_dataset_id = $dataset_id
""".command
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package lucuma.odb.service

import cats.data.EitherT
import cats.effect.Concurrent
import cats.syntax.applicative.*
import cats.syntax.applicativeError.*
import cats.syntax.bifunctor.*
import cats.syntax.either.*
import cats.syntax.eq.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import lucuma.core.enums.DatasetStage
import lucuma.core.enums.SequenceCommand
Expand Down Expand Up @@ -78,22 +80,43 @@ object ExecutionEventService {
override def insertDatasetEvent(
datasetId: Dataset.Id,
datasetStage: DatasetStage
)(using Transaction[F]): F[ExecutionEventService.InsertEventResponse] = {
)(using xa: Transaction[F]): F[ExecutionEventService.InsertEventResponse] = {

import InsertEventResponse.*

val insert: F[Either[DatasetNotFound, (ExecutionEvent.Id, Timestamp)]] =
val insertEvent: F[Either[DatasetNotFound, (ExecutionEvent.Id, Timestamp)]] =
session
.option(Statements.InsertDatasetEvent)(datasetId, datasetStage, datasetId)
.map(_.toRight(DatasetNotFound(datasetId)))
.recover {
case SqlState.ForeignKeyViolation(_) => DatasetNotFound(datasetId).asLeft
}

// Best-effort to set the dataset time accordingly. This can fail (leaving the timestamps
// unchanged) if there is an end event but no start or if the end time comes before the
// start.
def setDatasetTime(t: Timestamp): F[Unit] = {
def setWith(f: (Dataset.Id, Timestamp) => F[Unit]): F[Unit] =
for {
s <- xa.savepoint
_ <- f(datasetId, t).recoverWith {
case SqlState.CheckViolation(_) => xa.rollback(s).void
}
} yield ()

// StartObserve signals the start of the dataset, EndWrite the end.
datasetStage match {
case DatasetStage.StartObserve => setWith(services.datasetService.setStartTime)
case DatasetStage.EndWrite => setWith(services.datasetService.setEndTime)
case _ => ().pure
}
}

(for {
_ <- EitherT.fromEither(checkUser(NotAuthorized.apply))
e <- EitherT(insert).leftWiden[InsertEventResponse]
e <- EitherT(insertEvent).leftWiden[InsertEventResponse]
(eid, time) = e
_ <- EitherT.liftF(setDatasetTime(time))
} yield Success(eid, time)).merge

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ object GuideEnvironmentService {
constraints: ConstraintSet,
posAngleConstraint: PosAngleConstraint,
optWavelength: Option[Wavelength],
fpu: Either[GmosNorthFpu, GmosSouthFpu],
optFpu: Option[Either[GmosNorthFpu, GmosSouthFpu]],
explicitBase: Option[Coordinates]
) {
def agsParams: AgsParams = AgsParams.GmosAgsParams(fpu.some, PortDisposition.Side)
def agsParams: AgsParams = AgsParams.GmosAgsParams(optFpu, PortDisposition.Side)
def wavelength: Either[Error, Wavelength] =
optWavelength.toRight(Error.GeneralError(s"No wavelength defined for observation $id."))
def site: Site = fpu.fold(_ => Site.GN, _ => Site.GS)
def fpu: Either[Error, Either[GmosNorthFpu, GmosSouthFpu]] =
optFpu.toRight(Error.GeneralError(s"No configuration defined for observation $id."))
def site: Either[Error, Site] = fpu.map(_.fold(_ => Site.GN, _ => Site.GS))
}

def instantiate[F[_]: Concurrent](
Expand Down Expand Up @@ -317,6 +319,7 @@ object GuideEnvironmentService {
): F[Either[Error, GuideEnvironment]] =
(for {
obsInfo <- EitherT(getObservationInfo(pid, oid))
site <- EitherT.fromEither(obsInfo.site)
asterism <- EitherT(getAsterism(pid, oid))
tracking = ObjectTracking.fromAsterism(asterism)
tAndO <- EitherT(getTimeAndOffsets(pid, oid))
Expand Down Expand Up @@ -345,7 +348,7 @@ object GuideEnvironmentService {
)
)
positions <- EitherT.fromEither(
getPositions(oid, obsInfo.site, obsInfo.posAngleConstraint, offsets, tracking, obsTime, obsDuration)
getPositions(oid, site, obsInfo.posAngleConstraint, offsets, tracking, obsTime, obsDuration)
)
usable <- EitherT.fromEither(
processCandidates(obsInfo, baseCoords, scienceCoords, positions, candidates)
Expand Down Expand Up @@ -437,22 +440,22 @@ object GuideEnvironmentService {
)
.toRight(s"Invalid elevation range in observation $id.")

val fpu: Either[String, Either[GmosNorthFpu, GmosSouthFpu]] =
val fpu: Option[Either[GmosNorthFpu, GmosSouthFpu]] =
(nFpu, sFpu) match {
case (Some(north), None) => north.asLeft.asRight
case (None, Some(south)) => south.asRight.asRight
case _ => s"No configuration for observation $id.".asLeft
case (Some(north), None) => north.asLeft.some
case (None, Some(south)) => south.asRight.some
case _ => none
}

val explicitBase: Option[Coordinates] =
(ra, dec).mapN(Coordinates(_, _))

(elevRange, fpu).mapN((elev, f) =>
elevRange.map(elev =>
ObservationInfo(id,
ConstraintSet(image, cloud, sky, water, elev),
paConstraint,
wavelength,
f,
fpu,
explicitBase
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,17 +400,37 @@ object SequenceService {
$step_type
""".command

/**
* Selects completed step records for a particular observation. A completed
* step is one for which the completion time has been set by the reception
* of an EndStep step event and for which there are no pending datasets or
* datasets which have a QA state set to anything other than Pass.
*/
val SelectCompletedStepRecordsForObs: Query[Observation.Id, (Atom.Id, NonNegShort, SequenceType, Step.Id)] =
(sql"""
SELECT
a.c_atom_id,
a.c_step_count,
a.c_sequence_type,
s.c_step_id
FROM t_step_record s
INNER JOIN t_atom_record a ON a.c_atom_id = s.c_atom_id
WHERE """ ~> sql"""a.c_observation_id = $observation_id AND s.c_completed IS NOT NULL ORDER BY s.c_completed"""
).query(atom_id *: int2_nonneg *: sequence_type *: step_id)
FROM
t_step_record s
INNER JOIN
t_atom_record a
ON a.c_atom_id = s.c_atom_id
WHERE """ ~> sql"""a.c_observation_id = $observation_id AND s.c_completed IS NOT NULL""" <~ sql"""
AND NOT EXISTS (
SELECT 1
FROM t_dataset d
WHERE
d.c_step_id = s.c_step_id
AND (
d.c_end_time IS NULL
OR (d.c_qa_state IS NOT NULL AND d.c_qa_state <> 'Pass'::e_dataset_qa_state)
)
)
ORDER BY s.c_completed
""").query(atom_id *: int2_nonneg *: sequence_type *: step_id)

def encodeColumns(prefix: Option[String], columns: List[String]): String =
columns.map(c => s"${prefix.foldMap(_ + ".")}$c").intercalate(",\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ package mutation

import cats.effect.IO
import cats.syntax.either.*
import cats.syntax.eq.*
import cats.syntax.option.*
import cats.syntax.traverse.*
import io.circe.Json
import io.circe.literal.*
import lucuma.core.enums.DatasetStage
import lucuma.core.model.Observation
import lucuma.core.model.User
import lucuma.core.model.sequence.Dataset
import lucuma.core.util.Timestamp
import lucuma.odb.data.ObservingModeType


class addDatasetEvent extends OdbSuite {

val mode: ObservingModeType = ObservingModeType.GmosNorthLongSlit
val staff: User = TestUsers.Standard.staff(nextId, nextId)

override lazy val validUsers: List[User] = List(staff)
Expand Down Expand Up @@ -71,7 +76,7 @@ class addDatasetEvent extends OdbSuite {
"""

addDatasetEventTest(
ObservingModeType.GmosNorthLongSlit,
mode,
staff,
"N18630101S0001.fits",
did => query(did),
Expand Down Expand Up @@ -116,7 +121,7 @@ class addDatasetEvent extends OdbSuite {
"""

addDatasetEventTest(
ObservingModeType.GmosNorthLongSlit,
mode,
staff,
"N18630101S0002.fits",
did => query(did),
Expand All @@ -140,7 +145,7 @@ class addDatasetEvent extends OdbSuite {
}

test("addDatasetEvent - unknown dataset") {
def query: String =
val query: String =
s"""
mutation {
addDatasetEvent(input: {
Expand All @@ -157,7 +162,7 @@ class addDatasetEvent extends OdbSuite {
"""

addDatasetEventTest(
ObservingModeType.GmosNorthLongSlit,
mode,
staff,
"N18630101S0003.fits",
_ => query,
Expand All @@ -166,4 +171,78 @@ class addDatasetEvent extends OdbSuite {

}

private def addEvent(did: Dataset.Id, stage: DatasetStage): IO[Timestamp] =
query(
staff,
s"""
mutation {
addDatasetEvent(input: {
datasetId: "$did",
datasetStage: ${stage.tag.toUpperCase}
}) {
event {
received
}
}
}
"""
).map { json =>
json.hcursor.downFields("addDatasetEvent", "event", "received").require[Timestamp]
}

private def timestamps(did: Dataset.Id): IO[(Option[Timestamp], Option[Timestamp])] =
query(
staff,
s"""
query {
dataset(datasetId: "$did") {
start
end
}
}
"""
).map { json =>
val d = json.hcursor.downField("dataset")
val s = d.downField("start").require[Option[Timestamp]]
val e = d.downField("end").require[Option[Timestamp]]
(s, e)
}

private def timeTest(file: String, stages: DatasetStage*): IO[Unit] = {
def expected(times: List[Timestamp]): (Option[Timestamp], Option[Timestamp]) =
times.zip(stages).foldLeft((Option.empty[Timestamp], Option.empty[Timestamp])) { case ((start, end), (time, stage)) =>
if (stage === DatasetStage.StartObserve) (time.some, none)
else if ((stage === DatasetStage.EndWrite) && start.isDefined) (start, time.some)
else (start, end)
}

for {
ids <- recordDataset(mode, staff, file)
(oid, did) = ids
es <- stages.toList.traverse(addEvent(did, _))
ex = expected(es)
ts <- timestamps(did)
} yield assertEquals(ts, ex)
}

test("addDatasetEvent - no start time") {
timeTest("N18630101S0004.fits", DatasetStage.StartWrite)
}

test("addDatasetEvent - start") {
timeTest("N18630101S0005.fits", DatasetStage.StartObserve)
}

test("addDatasetEvent - start, end") {
timeTest("N18630101S0006.fits", DatasetStage.StartObserve, DatasetStage.EndWrite)
}

test("addDatasetEvent - end, no start") {
timeTest("N18630101S0007.fits", DatasetStage.EndWrite)
}

test("addDatasetEvent - start, end, start") {
timeTest("N18630101S0008.fits", DatasetStage.StartObserve, DatasetStage.EndWrite, DatasetStage.StartObserve)
}

}
Loading

0 comments on commit 0b3efc2

Please sign in to comment.