Skip to content

Commit

Permalink
[core] Reformat DCS client data output
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jan 18, 2024
1 parent 7d484d7 commit a4aa3a4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
57 changes: 49 additions & 8 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
const (
DCS_DIAL_TIMEOUT = 2 * time.Hour
DCS_GENERAL_OP_TIMEOUT = 45 * time.Second
DCS_TIME_FORMAT = "2006-01-02 15:04:05.000"
)

type Plugin struct {
Expand All @@ -73,6 +74,8 @@ type DCSDetectorOpAvailabilityMap map[dcspb.Detector]dcspb.DetectorState

type DCSDetectorInfoMap map[dcspb.Detector]*dcspb.DetectorInfo

type ECSDetectorInfoMap map[string]ECSDetectorInfo

func NewPlugin(endpoint string) integration.Plugin {
u, err := url.Parse(endpoint)
if err != nil {
Expand Down Expand Up @@ -121,7 +124,7 @@ func (p *Plugin) GetData(_ []any) string {
environmentIds := environment.ManagerInstance().Ids()

outMap := make(map[string]interface{})
outMap["partitions"] = p.partitionStatesForEnvs(environmentIds)
outMap["partitions"] = p.pendingEorsForEnvs(environmentIds)

p.detectorMapMu.RLock()
outMap["detectors"] = p.detectorMap.ToEcsDetectors()
Expand All @@ -139,20 +142,50 @@ func (p *Plugin) GetEnvironmentsData(environmentIds []uid.ID) map[uid.ID]string
return nil
}

out := p.partitionStatesForEnvs(environmentIds)
envMan := environment.ManagerInstance()
pendingEors := p.pendingEorsForEnvs(environmentIds)

out := make(map[uid.ID]string)

detectorMap := p.detectorMap.ToEcsDetectors()
for _, envId := range environmentIds {
env, err := envMan.Environment(envId)
if err != nil {
log.WithField("partition", envId).
WithError(err).
Error("DCS client cannot acquire environment")
continue
}

includedDetectors := env.GetActiveDetectors().StringList()
includedDetectorsMap := detectorMap.Filtered(includedDetectors)

pi := PartitionInfo{
Detectors: includedDetectorsMap,
}
if pendingEorStatus, pendingEorExists := pendingEors[envId]; pendingEorExists {
pi.SorSuccessful = pendingEorStatus
}

marshalled, err := json.Marshal(pi)
if err != nil {
continue
}
out[envId] = string(marshalled[:])
}

return out
}

func (p *Plugin) GetEnvironmentsShortData(environmentIds []uid.ID) map[uid.ID]string {
return p.GetEnvironmentsData(environmentIds)
return nil
}

func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]string {
out := make(map[uid.ID]string)
func (p *Plugin) pendingEorsForEnvs(envIds []uid.ID) map[uid.ID]bool {
out := make(map[uid.ID]bool)
for _, envId := range envIds {
if _, ok := p.pendingEORs[envId.String()]; ok {
out[envId] = "SOR SUCCESSFUL"
}
_, pendingEorExists := p.pendingEORs[envId.String()]
out[envId] = pendingEorExists
}
return out
}
Expand All @@ -168,6 +201,10 @@ func (p *Plugin) updateLastKnownDetectorStates(detectorMatrix []*dcspb.DetectorI
} else {
if detInfo.State != dcspb.DetectorState_NULL_STATE {
p.detectorMap[dcsDet].State = detInfo.State
timestamp, err := time.Parse(DCS_TIME_FORMAT, detInfo.Timestamp)
if err == nil {
p.detectorMap[dcsDet].Timestamp = fmt.Sprintf("%d", timestamp.UnixMilli())
}
}
}
}
Expand All @@ -184,6 +221,10 @@ func (p *Plugin) updateDetectorOpAvailabilities(detectorMatrix []*dcspb.Detector
} else {
p.detectorMap[dcsDet].PfrAvailability = detInfo.PfrAvailability
p.detectorMap[dcsDet].SorAvailability = detInfo.SorAvailability
timestamp, err := time.Parse(DCS_TIME_FORMAT, detInfo.Timestamp)
if err == nil {
p.detectorMap[dcsDet].Timestamp = fmt.Sprintf("%d", timestamp.UnixMilli())
}
}
}
}
Expand Down
45 changes: 42 additions & 3 deletions core/integration/dcs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ import (
dcspb "github.com/AliceO2Group/Control/core/integration/dcs/protos"
)

type PartitionInfo struct {
SorSuccessful bool
Detectors ECSDetectorInfoMap
}

type ECSDetectorInfo struct {
State string
Timestamp string
AllowedRunTypes []string
PfrAvailability string
SorAvailability string
}

func (d DCSDetectors) ToStringSlice() (sslice []string) {
if d == nil {
return
Expand Down Expand Up @@ -102,10 +115,36 @@ func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionStat
}
}

func (m DCSDetectorInfoMap) ToEcsDetectors() map[string]*dcspb.DetectorInfo {
out := make(map[string]*dcspb.DetectorInfo)
func fromDcsDetectorInfo(d *dcspb.DetectorInfo) ECSDetectorInfo {
return ECSDetectorInfo{
State: d.GetState().String(),
Timestamp: d.GetTimestamp(),
AllowedRunTypes: func(rts []dcspb.RunType) []string {
out := make([]string, len(rts))
for i, rt := range rts {
out[i] = rt.String()
}
return out
}(d.GetAllowedRunTypes()),
PfrAvailability: d.GetPfrAvailability().String(),
SorAvailability: d.GetSorAvailability().String(),
}
}

func (m DCSDetectorInfoMap) ToEcsDetectors() ECSDetectorInfoMap {
out := make(map[string]ECSDetectorInfo)
for k, v := range m {
out[dcsToEcsDetector(k)] = v
out[dcsToEcsDetector(k)] = fromDcsDetectorInfo(v)
}
return out
}

func (m ECSDetectorInfoMap) Filtered(detectorList []string) ECSDetectorInfoMap {
out := make(ECSDetectorInfoMap)
for _, det := range detectorList {
if _, ok := m[det]; ok {
out[det] = m[det]
}
}
return out
}

0 comments on commit a4aa3a4

Please sign in to comment.