Skip to content

Commit

Permalink
fix: mutex calls to sqlitex
Browse files Browse the repository at this point in the history
[zombiezen/go-sqlite]
(https://github.com/zombiezen/go-sqlite/blob/main/doc.go#L32) is not
thread safe when used through a single connection. The current code is
provably racing (run the server with `-race` and a few workflows being
run) and it will tell you this if you `argo list` via the server a few
times.

This change doesn't attempt to move to a multiple connection model,
it's a minimal change to stop the server crashing all the time, by
mutexing the use of the sql connection.

Fixes argoproj#13154 and argoproj#13140

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Jun 11, 2024
1 parent 465c7b6 commit 0c3bd14
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions server/workflow/store/sqlite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -73,6 +74,7 @@ type WorkflowStore interface {
type SQLiteStore struct {
conn *sqlite.Conn
instanceService instanceid.Service
mtx sync.Mutex
}

var _ WorkflowStore = &SQLiteStore{}
Expand Down Expand Up @@ -102,6 +104,8 @@ where instanceid = ?
}

var workflows = wfv1.Workflows{}
s.mtx.Lock()
defer s.mtx.Unlock()
err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
Expand Down Expand Up @@ -143,6 +147,8 @@ where instanceid = ?
}

var total int64
s.mtx.Lock()
defer s.mtx.Unlock()
err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
Expand All @@ -161,6 +167,8 @@ func (s *SQLiteStore) Add(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.upsertWorkflow(wf)
defer done(&err)
Expand All @@ -172,6 +180,8 @@ func (s *SQLiteStore) Update(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.upsertWorkflow(wf)
defer done(&err)
Expand All @@ -183,6 +193,8 @@ func (s *SQLiteStore) Delete(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
}

Expand All @@ -195,6 +207,8 @@ func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error
}
wfs = append(wfs, wf)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.replaceWorkflows(wfs)
defer done(&err)
Expand Down Expand Up @@ -222,6 +236,7 @@ func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err e
}

func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error {
// Called with the mutex
err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
if err != nil {
return err
Expand Down

0 comments on commit 0c3bd14

Please sign in to comment.