diff --git a/server/workflow/store/sqlite_store.go b/server/workflow/store/sqlite_store.go index c3518a8f5e16..c06eda14e4c3 100644 --- a/server/workflow/store/sqlite_store.go +++ b/server/workflow/store/sqlite_store.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -73,6 +74,7 @@ type WorkflowStore interface { type SQLiteStore struct { conn *sqlite.Conn instanceService instanceid.Service + mtx sync.Mutex } var _ WorkflowStore = &SQLiteStore{} @@ -102,6 +104,8 @@ where instanceid = ? } var workflows = wfv1.Workflows{} + s.mtx.Lock() + defer s.mtx.Unlock() err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ Args: args, ResultFunc: func(stmt *sqlite.Stmt) error { @@ -143,6 +147,8 @@ where instanceid = ? } var total int64 + s.mtx.Lock() + defer s.mtx.Unlock() err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ Args: args, ResultFunc: func(stmt *sqlite.Stmt) error { @@ -161,6 +167,8 @@ func (s *SQLiteStore) Add(obj interface{}) error { if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) } + s.mtx.Lock() + defer s.mtx.Unlock() done := sqlitex.Transaction(s.conn) err := s.upsertWorkflow(wf) defer done(&err) @@ -172,6 +180,8 @@ func (s *SQLiteStore) Update(obj interface{}) error { if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) } + s.mtx.Lock() + defer s.mtx.Unlock() done := sqlitex.Transaction(s.conn) err := s.upsertWorkflow(wf) defer done(&err) @@ -183,6 +193,8 @@ func (s *SQLiteStore) Delete(obj interface{}) error { if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) } + s.mtx.Lock() + defer s.mtx.Unlock() return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) } @@ -195,6 +207,8 @@ func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error } wfs = append(wfs, wf) } + s.mtx.Lock() + defer s.mtx.Unlock() done := sqlitex.Transaction(s.conn) err := s.replaceWorkflows(wfs) defer done(&err) @@ -222,6 +236,7 @@ func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err e } func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error { + // Called with the mutex err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) if err != nil { return err