From 78894d72d1d60bc8d33e3104dd7ff0cada71c094 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Fri, 1 Mar 2024 14:51:04 +0100 Subject: [PATCH] [core] Add support for dcs_pfr/sor_grace_period variables --- core/integration/dcs/plugin.go | 100 ++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 2 deletions(-) diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index a6a1578c..a3ab4a8e 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -402,8 +402,54 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + // We acquire a grace period during which we hope that DCS will become compatible with the operation. + // During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz, + // and if we don't get a compatible state within the grace period, we declare the operation failed. + pfrGracePeriod := time.Duration(0) + pfrGracePeriodS, ok := varStack["dcs_pfr_grace_period"] + if ok { + pfrGracePeriod, err = time.ParseDuration(pfrGracePeriodS) + if err != nil { + log.WithError(err). + WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "PrepareForRun"). + Warnf("cannot parse DCS PFR grace period, assuming 0 seconds") + } + } else { + log.WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "PrepareForRun"). + Info("DCS PFR grace period not set, defaulting to 0 seconds") + } + + pfrGraceTimeout := time.Now().Add(pfrGracePeriod) + isCompatibleWithOperation := false + knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors) - isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) + isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) + + for { + if isCompatibleWithOperation { + break + } else { + log.WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "PrepareForRun"). + WithField("grace_period", pfrGracePeriod.String()). + WithField("remaining_grace_period", pfrGraceTimeout.Sub(time.Now()).String()). + Infof("waiting for DCS operation readiness: %s", err.Error()) + time.Sleep(1 * time.Second) + } + + if time.Now().Before(pfrGraceTimeout) { + knownDetectorStates = p.getDetectorsPfrAvailability(dcsDetectors) + isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) + } else { + break + } + } + if !isCompatibleWithOperation { log.WithError(err). WithField("level", infologger.IL_Ops). @@ -422,6 +468,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Warnf("cannot determine PFR readiness: %s", err.Error()) } + // By now the DCS must be in a compatible state, so we proceed with gathering params for the operation + log.WithField("partition", envId). WithField("level", infologger.IL_Ops). Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectors.EcsDetectorsSlice(), " ")) @@ -790,8 +838,54 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + // We acquire a grace period during which we hope that DCS will become compatible with the operation. + // During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz, + // and if we don't get a compatible state within the grace period, we declare the operation failed. + sorGracePeriod := time.Duration(0) + sorGracePeriodS, ok := varStack["dcs_sor_grace_period"] + if ok { + sorGracePeriod, err = time.ParseDuration(sorGracePeriodS) + if err != nil { + log.WithError(err). + WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "StartOfRun"). + Warnf("cannot parse DCS SOR grace period, assuming 0 seconds") + } + } else { + log.WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "StartOfRun"). + Info("DCS SOR grace period not set, defaulting to 0 seconds") + } + + sorGraceTimeout := time.Now().Add(sorGracePeriod) + isCompatibleWithOperation := false + knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors) - isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) + isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) + + for { + if isCompatibleWithOperation { + break + } else { + log.WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "StartOfRun"). + WithField("grace_period", sorGracePeriod.String()). + WithField("remaining_grace_period", sorGraceTimeout.Sub(time.Now()).String()). + Infof("waiting for DCS operation readiness: %s", err.Error()) + time.Sleep(1 * time.Second) + } + + if time.Now().Before(sorGraceTimeout) { + knownDetectorStates = p.getDetectorsSorAvailability(dcsDetectors) + isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) + } else { + break + } + } + if !isCompatibleWithOperation { log.WithError(err). WithField("level", infologger.IL_Ops). @@ -810,6 +904,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Warnf("cannot determine SOR readiness: %s", err.Error()) } + // By now the DCS must be in a compatible state, so we proceed with gathering params for the operation + log.WithField("partition", envId). WithField("level", infologger.IL_Ops). WithField("run", runNumber64).