From 39bbc46774ee10d0ec2a0703ac30e818ba76497e Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Mon, 4 Jan 2021 14:14:08 +0800 Subject: [PATCH] manager: fix checkpoint --- pkg/manager/manager.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1cb8d48..60c3a5c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -51,11 +51,11 @@ type Status struct { } type WorkerCheckPoint struct { - LastInvokeTime time.Time + LastInvokeTime time.Time `json:"last_invoke_time"` } type CheckPoint struct { - workerInfo map[string]WorkerCheckPoint + WorkerInfo map[string]WorkerCheckPoint `json:"worker_info"` } // fromCheckpoint laods last invoke time from json @@ -89,7 +89,7 @@ func NewManager(config *config.Config) (*Manager, error) { if err != nil { logger.Info("failed to parse checkpoint file") } else { - for worker, info := range checkpoint.workerInfo { + for worker, info := range checkpoint.WorkerInfo { workersLastInvokeTime[worker] = info.LastInvokeTime } } @@ -120,9 +120,18 @@ func NewManager(config *config.Config) (*Manager, error) { } func (m *Manager) checkpoint() error { - file, _ := json.MarshalIndent(m.workersLastInvokeTime, "", " ") + ckptObj := &CheckPoint{WorkerInfo: make(map[string]WorkerCheckPoint)} + for k, t := range m.workersLastInvokeTime { + ckptObj.WorkerInfo[k] = WorkerCheckPoint{ + LastInvokeTime: t, + } + } + file, err := json.MarshalIndent(ckptObj, "", " ") + if err != nil { + return err + } ckpt := fmt.Sprintf("%s.tmp", m.config.Checkpoint) - err := ioutil.WriteFile(ckpt, file, 0644) + err = ioutil.WriteFile(ckpt, file, 0644) if err != nil { return err } @@ -184,6 +193,7 @@ func (m *Manager) Run() { }).Debugf("Calling RunSync() to w %s", w.GetConfig()["name"]) go w.RunSync() } + m.checkpoint() for { // wait until config.Interval seconds has elapsed select {