Skip to content

Commit

Permalink
fix: retry large archived wf. Fixes argoproj#12740 (argoproj#12741)
Browse files Browse the repository at this point in the history
Signed-off-by: heidongxianhua <[email protected]>
  • Loading branch information
heidongxianhua authored Apr 28, 2024
1 parent 30ca369 commit 6182386
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}}
}

Expand Down
2 changes: 1 addition & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa
}

grpcServer := grpc.NewServer(sOpts...)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo)
infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor))
eventpkg.RegisterEventServiceServer(grpcServer, eventServer)
eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer())
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {

archivedRepo := &mocks.WorkflowArchive{}

wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo)
wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo)
archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"},
Spec: v1alpha1.WorkflowSpec{
Expand Down
28 changes: 25 additions & 3 deletions server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
"github.com/argoproj/argo-workflows/v3/workflow/util"

sutils "github.com/argoproj/argo-workflows/v3/server/utils"
Expand All @@ -31,12 +32,14 @@ import (
const disableValueListRetrievalKeyPattern = "DISABLE_VALUE_LIST_RETRIEVAL_KEY_PATTERN"

type archivedWorkflowServer struct {
wfArchive sqldb.WorkflowArchive
wfArchive sqldb.WorkflowArchive
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
}

// NewWorkflowArchiveServer returns a new archivedWorkflowServer
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
Expand Down Expand Up @@ -282,6 +285,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
oriUid := wf.UID

_, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{})
if apierr.IsNotFound(err) {
Expand All @@ -299,12 +303,30 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
}
}

log.WithFields(log.Fields{"Dehydrate workflow uid=": wf.UID}).Info("RetryArchivedWorkflow")
// If the Workflow needs to be dehydrated in order to capture and retain all of the previous state for the subsequent workflow, then do so
err = w.hydrator.Dehydrate(wf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
// if the Workflow was dehydrated before, we need to capture and maintain its previous state for the new Workflow
if !w.hydrator.IsHydrated(wf) {
offloadedNodes, err := w.offloadNodeStatusRepo.Get(string(oriUid), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
_, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, offloadedNodes)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
}

return result, nil
}
Expand Down
8 changes: 7 additions & 1 deletion server/workflowarchive/archived_workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
authorizationv1 "k8s.io/api/authorization/v1"
Expand All @@ -17,8 +18,10 @@ import (
kubefake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/server/auth"
Expand All @@ -29,7 +32,10 @@ func Test_archivedWorkflowServer(t *testing.T) {
repo := &mocks.WorkflowArchive{}
kubeClient := &kubefake.Clientset{}
wfClient := &argofake.Clientset{}
w := NewWorkflowArchiveServer(repo)
offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true)
offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil)
w := NewWorkflowArchiveServer(repo, offloadNodeStatusRepo)
allowed := true
kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &authorizationv1.SelfSubjectAccessReview{
Expand Down
7 changes: 6 additions & 1 deletion util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func IsTransientErr(err error) bool {
apierr.IsServiceUnavailable(err) ||
isTransientEtcdErr(err) ||
matchTransientErrPattern(err) ||
errors.Is(err, NewErrTransient(""))
errors.Is(err, NewErrTransient("")) ||
isTransientSqbErr(err)
if isTransient {
log.Infof("Transient error: %v", err)
} else {
Expand Down Expand Up @@ -123,3 +124,7 @@ func generateErrorString(err error) string {
}
return errorString
}

func isTransientSqbErr(err error) bool {
return strings.Contains(err.Error(), "upper: no more rows in")
}

0 comments on commit 6182386

Please sign in to comment.